diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala index d4badbc5c4254dc5694393dd051a0706203a21ea..a0fb4fe25d188b02f1ac62245eaaa0de86b5b46c 100644 --- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,4 +1,6 @@ package spark.deploy +import org.apache.hadoop.conf.Configuration + /** * Contains util methods to interact with Hadoop from spark. @@ -15,4 +17,7 @@ object SparkHadoopUtil { // Add support, if exists - for now, simply run func ! func(args) } + + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + def newConfiguration(): Configuration = new Configuration() } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala index 66e5ad8491064e683802ccbc6c2a1e93b844f030..ab1ab9d8a7a8e8530938f6a1ddc4c860cffb1af0 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala @@ -12,7 +12,7 @@ import java.security.PrivilegedExceptionAction */ object SparkHadoopUtil { - val yarnConf = new YarnConfiguration(new Configuration()) + val yarnConf = newConfiguration() def getUserNameFromEnvironment(): String = { // defaulting to env if -D is not present ... @@ -56,4 +56,8 @@ object SparkHadoopUtil { def setYarnMode(env: HashMap[String, String]) { env("SPARK_YARN_MODE") = "true" } + + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + // Always create a new config, dont reuse yarnConf. + def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) } diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala index d4badbc5c4254dc5694393dd051a0706203a21ea..a0fb4fe25d188b02f1ac62245eaaa0de86b5b46c 100644 --- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,4 +1,6 @@ package spark.deploy +import org.apache.hadoop.conf.Configuration + /** * Contains util methods to interact with Hadoop from spark. @@ -15,4 +17,7 @@ object SparkHadoopUtil { // Add support, if exists - for now, simply run func ! func(args) } + + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + def newConfiguration(): Configuration = new Configuration() } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index e853bce2c425428dea89c33b2e3dd4128479557b..5f5ec0b0f4d2992571fa06247dc3cdc13ac17c42 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} import org.apache.mesos.MesosNativeLibrary -import spark.deploy.LocalSparkCluster +import spark.deploy.{SparkHadoopUtil, LocalSparkCluster} import spark.partial.ApproximateEvaluator import spark.partial.PartialResult import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} @@ -102,7 +102,9 @@ class SparkContext( // Add each JAR given through the constructor - if (jars != null) jars.foreach { addJar(_) } + if (jars != null) { + jars.foreach { addJar(_) } + } // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() @@ -114,7 +116,9 @@ class SparkContext( executorEnvs(key) = value } } - if (environment != null) executorEnvs ++= environment + if (environment != null) { + executorEnvs ++= environment + } // Create and start the scheduler private var taskScheduler: TaskScheduler = { @@ -207,7 +211,7 @@ class SparkContext( /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { - val conf = new Configuration() + val conf = SparkHadoopUtil.newConfiguration() // Explicitly check for S3 environment variables if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) @@ -711,7 +715,7 @@ class SparkContext( */ def setCheckpointDir(dir: String, useExisting: Boolean = false) { val path = new Path(dir) - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) if (!useExisting) { if (fs.exists(path)) { throw new Exception("Checkpoint directory '" + path + "' already exists.") diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 3e54fa7a7e79aa2a3e295819bf2a11f403c6fc82..9f48cbe490a2703fe279c602d831f6b26794c556 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -4,7 +4,6 @@ import java.io._ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket} import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.JavaConversions._ @@ -208,7 +207,7 @@ private object Utils extends Logging { case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others val uri = new URI(url) - val conf = new Configuration() + val conf = SparkHadoopUtil.newConfiguration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) @@ -317,7 +316,6 @@ private object Utils extends Logging { * Get the local machine's hostname. */ def localHostName(): String = { - // customHostname.getOrElse(InetAddress.getLocalHost.getHostName) customHostname.getOrElse(localIpAddressHostname) } @@ -337,6 +335,7 @@ private object Utils extends Logging { retval } + /* // Used by DEBUG code : remove when all testing done private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$") def checkHost(host: String, message: String = "") { @@ -358,12 +357,11 @@ private object Utils extends Logging { Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message) } } + */ // Once testing is complete in various modes, replace with this ? - /* def checkHost(host: String, message: String = "") {} def checkHostPort(hostPort: String, message: String = "") {} - */ def getUserNameFromEnvironment(): String = { SparkHadoopUtil.getUserNameFromEnvironment diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 24d527f38fa0280075a5d2b8cce50f2460020c88..79d00edee78ecba5f7571e72022b20c61d9c5ecf 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -8,6 +8,7 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.fs.Path import java.io.{File, IOException, EOFException} import java.text.NumberFormat +import spark.deploy.SparkHadoopUtil private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -65,7 +66,7 @@ private[spark] object CheckpointRDD extends Logging { def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { val outputDir = new Path(path) - val fs = outputDir.getFileSystem(new Configuration()) + val fs = outputDir.getFileSystem(SparkHadoopUtil.newConfiguration()) val finalOutputName = splitIdToFile(ctx.splitId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -103,7 +104,7 @@ private[spark] object CheckpointRDD extends Logging { } def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) val serializer = SparkEnv.get.serializer.newInstance() @@ -125,7 +126,7 @@ private[spark] object CheckpointRDD extends Logging { val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala index 0f42f405a058cf9f2a218004b47c66d2330810d6..3d080a02577a38cdd2eeb1cd30a5c5e7b6fed629 100644 --- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala @@ -4,6 +4,8 @@ import java.util.Random import scala.math.exp import spark.util.Vector import spark._ +import spark.deploy.SparkHadoopUtil +import spark.scheduler.InputFormatInfo /** * Logistic regression based classification. @@ -32,9 +34,13 @@ object SparkHdfsLR { System.err.println("Usage: SparkHdfsLR <master> <file> <iters>") System.exit(1) } + val inputPath = args(1) + val conf = SparkHadoopUtil.newConfiguration() val sc = new SparkContext(args(0), "SparkHdfsLR", - System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - val lines = sc.textFile(args(1)) + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), + InputFormatInfo.computePreferredLocations( + Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)))) + val lines = sc.textFile(inputPath) val points = lines.map(parsePoint _).cache() val ITERATIONS = args(2).toInt diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 91e3123bc5eb68da4fc605551a0873ebd7227353..0a5b89d92704f65ceee98d05c9de182afb0240ff 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -47,10 +47,8 @@ object SparkBuild extends Build { scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, - // retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", transitiveClassifiers in Scope.GlobalScope := Seq("sources"), - // For some reason this fails on some nodes and works on others - not yet debugged why - // testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), + testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), // shared between both core and streaming. resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"), @@ -170,7 +168,11 @@ object SparkBuild extends Build { Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION) }), unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / - ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") "src/hadoop2-yarn/scala" else "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" ) + ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") { + "src/hadoop2-yarn/scala" + } else { + "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" + } ) } ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings