From 7acab3ab45df421601ee9a076a61de00561a0308 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan <mridul@gmail.com> Date: Mon, 22 Apr 2013 08:01:13 +0530 Subject: [PATCH] Fix review comments, add a new api to SparkHadoopUtil to create appropriate Configuration. Modify an example to show how to use SplitInfo --- .../scala/spark/deploy/SparkHadoopUtil.scala | 5 +++++ .../scala/spark/deploy/SparkHadoopUtil.scala | 6 +++++- .../scala/spark/deploy/SparkHadoopUtil.scala | 5 +++++ core/src/main/scala/spark/SparkContext.scala | 14 +++++++++----- core/src/main/scala/spark/Utils.scala | 8 +++----- core/src/main/scala/spark/rdd/CheckpointRDD.scala | 7 ++++--- .../main/scala/spark/examples/SparkHdfsLR.scala | 10 ++++++++-- project/SparkBuild.scala | 10 ++++++---- 8 files changed, 45 insertions(+), 20 deletions(-) diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala index d4badbc5c4..a0fb4fe25d 100644 --- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,4 +1,6 @@ package spark.deploy +import org.apache.hadoop.conf.Configuration + /** * Contains util methods to interact with Hadoop from spark. @@ -15,4 +17,7 @@ object SparkHadoopUtil { // Add support, if exists - for now, simply run func ! func(args) } + + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + def newConfiguration(): Configuration = new Configuration() } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala index 66e5ad8491..ab1ab9d8a7 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala @@ -12,7 +12,7 @@ import java.security.PrivilegedExceptionAction */ object SparkHadoopUtil { - val yarnConf = new YarnConfiguration(new Configuration()) + val yarnConf = newConfiguration() def getUserNameFromEnvironment(): String = { // defaulting to env if -D is not present ... @@ -56,4 +56,8 @@ object SparkHadoopUtil { def setYarnMode(env: HashMap[String, String]) { env("SPARK_YARN_MODE") = "true" } + + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + // Always create a new config, dont reuse yarnConf. + def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) } diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala index d4badbc5c4..a0fb4fe25d 100644 --- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,4 +1,6 @@ package spark.deploy +import org.apache.hadoop.conf.Configuration + /** * Contains util methods to interact with Hadoop from spark. @@ -15,4 +17,7 @@ object SparkHadoopUtil { // Add support, if exists - for now, simply run func ! func(args) } + + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + def newConfiguration(): Configuration = new Configuration() } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index e853bce2c4..5f5ec0b0f4 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} import org.apache.mesos.MesosNativeLibrary -import spark.deploy.LocalSparkCluster +import spark.deploy.{SparkHadoopUtil, LocalSparkCluster} import spark.partial.ApproximateEvaluator import spark.partial.PartialResult import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} @@ -102,7 +102,9 @@ class SparkContext( // Add each JAR given through the constructor - if (jars != null) jars.foreach { addJar(_) } + if (jars != null) { + jars.foreach { addJar(_) } + } // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() @@ -114,7 +116,9 @@ class SparkContext( executorEnvs(key) = value } } - if (environment != null) executorEnvs ++= environment + if (environment != null) { + executorEnvs ++= environment + } // Create and start the scheduler private var taskScheduler: TaskScheduler = { @@ -207,7 +211,7 @@ class SparkContext( /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { - val conf = new Configuration() + val conf = SparkHadoopUtil.newConfiguration() // Explicitly check for S3 environment variables if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) @@ -711,7 +715,7 @@ class SparkContext( */ def setCheckpointDir(dir: String, useExisting: Boolean = false) { val path = new Path(dir) - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) if (!useExisting) { if (fs.exists(path)) { throw new Exception("Checkpoint directory '" + path + "' already exists.") diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 3e54fa7a7e..9f48cbe490 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -4,7 +4,6 @@ import java.io._ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket} import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.JavaConversions._ @@ -208,7 +207,7 @@ private object Utils extends Logging { case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others val uri = new URI(url) - val conf = new Configuration() + val conf = SparkHadoopUtil.newConfiguration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) @@ -317,7 +316,6 @@ private object Utils extends Logging { * Get the local machine's hostname. */ def localHostName(): String = { - // customHostname.getOrElse(InetAddress.getLocalHost.getHostName) customHostname.getOrElse(localIpAddressHostname) } @@ -337,6 +335,7 @@ private object Utils extends Logging { retval } + /* // Used by DEBUG code : remove when all testing done private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$") def checkHost(host: String, message: String = "") { @@ -358,12 +357,11 @@ private object Utils extends Logging { Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message) } } + */ // Once testing is complete in various modes, replace with this ? - /* def checkHost(host: String, message: String = "") {} def checkHostPort(hostPort: String, message: String = "") {} - */ def getUserNameFromEnvironment(): String = { SparkHadoopUtil.getUserNameFromEnvironment diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 24d527f38f..79d00edee7 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -8,6 +8,7 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.fs.Path import java.io.{File, IOException, EOFException} import java.text.NumberFormat +import spark.deploy.SparkHadoopUtil private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -65,7 +66,7 @@ private[spark] object CheckpointRDD extends Logging { def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { val outputDir = new Path(path) - val fs = outputDir.getFileSystem(new Configuration()) + val fs = outputDir.getFileSystem(SparkHadoopUtil.newConfiguration()) val finalOutputName = splitIdToFile(ctx.splitId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -103,7 +104,7 @@ private[spark] object CheckpointRDD extends Logging { } def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) val serializer = SparkEnv.get.serializer.newInstance() @@ -125,7 +126,7 @@ private[spark] object CheckpointRDD extends Logging { val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala index 0f42f405a0..3d080a0257 100644 --- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala @@ -4,6 +4,8 @@ import java.util.Random import scala.math.exp import spark.util.Vector import spark._ +import spark.deploy.SparkHadoopUtil +import spark.scheduler.InputFormatInfo /** * Logistic regression based classification. @@ -32,9 +34,13 @@ object SparkHdfsLR { System.err.println("Usage: SparkHdfsLR <master> <file> <iters>") System.exit(1) } + val inputPath = args(1) + val conf = SparkHadoopUtil.newConfiguration() val sc = new SparkContext(args(0), "SparkHdfsLR", - System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - val lines = sc.textFile(args(1)) + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), + InputFormatInfo.computePreferredLocations( + Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)))) + val lines = sc.textFile(inputPath) val points = lines.map(parsePoint _).cache() val ITERATIONS = args(2).toInt diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 91e3123bc5..0a5b89d927 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -47,10 +47,8 @@ object SparkBuild extends Build { scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, - // retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", transitiveClassifiers in Scope.GlobalScope := Seq("sources"), - // For some reason this fails on some nodes and works on others - not yet debugged why - // testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), + testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), // shared between both core and streaming. resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"), @@ -170,7 +168,11 @@ object SparkBuild extends Build { Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION) }), unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / - ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") "src/hadoop2-yarn/scala" else "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" ) + ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") { + "src/hadoop2-yarn/scala" + } else { + "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" + } ) } ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings -- GitLab