diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 91e35e2d34e2ef4d5db0b96a04139e0daa9971ee..7c32e0ab9bcb5c41ecfad1688062d66abce86241 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils @@ -60,14 +60,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var isLastAMRetry: Boolean = true private var amClient: AMRMClient[ContainerRequest] = _ + private val sparkConf = new SparkConf() // Default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3).toString()).toInt + private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", + math.max(args.numWorkers * 2, 3)) def run() { // Setup the directories so things go to YARN approved directories rather // than user specified and /tmp. - conf.set("spark.local.dir", getLocalDirs()) + System.setProperty("spark.local.dir", getLocalDirs()) + + // set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") // Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) @@ -89,8 +94,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. - waitForSparkMaster() - waitForSparkContextInitialized() // Do this after Spark master is up and SparkContext is created so that we can register UI Url. @@ -134,30 +137,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) } - private def waitForSparkMaster() { - logInfo("Waiting for Spark driver to be reachable.") - var driverUp = false - var tries = 0 - val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt - while (!driverUp && tries < numTries) { - val driverHost = conf.get("spark.driver.host") - val driverPort = conf.get("spark.driver.port") - try { - val socket = new Socket(driverHost, driverPort.toInt) - socket.close() - logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) - driverUp = true - } catch { - case e: Exception => { - logWarning("Failed to connect to driver at %s:%s, retrying ...". - format(driverHost, driverPort)) - Thread.sleep(100) - tries = tries + 1 - } - } - } - } - private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( @@ -199,7 +178,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ApplicationMaster.sparkContextRef.synchronized { var numTries = 0 val waitTime = 10000L - val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt + val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) { logInfo("Waiting for Spark context initialization ... " + numTries) numTries = numTries + 1 @@ -215,7 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e amClient, appAttemptId, args, - sparkContext.preferredNodeLocationData) + sparkContext.preferredNodeLocationData, + sparkContext.getConf) } else { logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d". format(numTries * waitTime, maxNumTries)) @@ -223,7 +203,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e yarnConf, amClient, appAttemptId, - args) + args, + sparkContext.getConf) } } } finally { @@ -265,7 +246,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = - conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) + // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) @@ -343,7 +325,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private def cleanupStagingDir() { var stagingDirPath: Path = null try { - val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) { stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) { diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1bba6a5ae4ac86ccb2b3986e5704b3bdc3825353..a75066888c22f63ae374399cfbcb239abcdc5808 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil @@ -57,6 +57,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() + private val sparkConf = new SparkConf + // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700: Short) @@ -244,7 +246,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort + val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) @@ -437,7 +439,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } def monitorApplication(appId: ApplicationId): Boolean = { - val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong + val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) while (true) { Thread.sleep(interval) @@ -501,7 +503,7 @@ object Client { Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false") + val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1a9bb97b3ececa8606feb06afd6bbb2cd846f7af..7aac2328dad60794c35142a64fc18b87a5a97973 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.spark.SparkConf import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} import org.apache.spark.util.IntParam import org.apache.spark.util.MemoryParam @@ -35,7 +36,7 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 // MB var workerCores = 1 var numWorkers = 2 - var amQueue = conf.getOrElse("QUEUE", "default") + var amQueue = new SparkConf().get("QUEUE", "default") var amMemory: Int = 512 // MB var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index f7d73f0d83f474894c1f9a1bb21cda4788e41cf3..99b824e1295a7d87b9e7753eb353eaf32c1f2673 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -47,8 +47,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var driverClosed:Boolean = false private var amClient: AMRMClient[ContainerRequest] = _ + private val sparkConf = new SparkConf - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1 + val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = sparkConf)._1 var actor: ActorRef = _ // This actor just working as a monitor to watch on Driver Actor. @@ -136,8 +138,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte Thread.sleep(100) } } - conf.set("spark.driver.host", driverHost) - conf.set("spark.driver.port", driverPort.toString) + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -157,7 +159,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte amClient, appAttemptId, args, - preferredNodeLocationData) + preferredNodeLocationData, + sparkConf) logInfo("Allocating " + args.numWorkers + " workers.") // Wait until all containers have finished diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index abc3447746f9e27d9db43cc7cd12e078f12b941c..85ab08ef34db4738f53f95d3d68dadde555df3cb 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -26,7 +26,7 @@ import scala.collection import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -64,7 +64,8 @@ private[yarn] class YarnAllocationHandler( val workerMemory: Int, val workerCores: Int, val preferredHostToCount: Map[String, Int], - val preferredRackToCount: Map[String, Int]) + val preferredRackToCount: Map[String, Int], + val sparkConf: SparkConf) extends Logging { // These three are locked on allocatedHostToContainersMap. Complementary data structures // allocatedHostToContainersMap : containers which are running : host, Set<containerid> @@ -254,8 +255,8 @@ private[yarn] class YarnAllocationHandler( } else { val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( - conf.get("spark.driver.host"), - conf.get("spark.driver.port"), + sparkConf.get("spark.driver.host"), + sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("Launching container %s for on host %s".format(containerId, workerHostname)) @@ -565,7 +566,8 @@ object YarnAllocationHandler { conf: Configuration, amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments + args: ApplicationMasterArguments, + sparkConf: SparkConf ): YarnAllocationHandler = { new YarnAllocationHandler( conf, @@ -575,7 +577,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, Map[String, Int](), - Map[String, Int]()) + Map[String, Int](), + sparkConf) } def newAllocator( @@ -584,7 +587,8 @@ object YarnAllocationHandler { appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, map: collection.Map[String, - collection.Set[SplitInfo]] + collection.Set[SplitInfo]], + sparkConf: SparkConf ): YarnAllocationHandler = { val (hostToSplitCount, rackToSplitCount) = generateNodeToWeight(conf, map) new YarnAllocationHandler( @@ -595,7 +599,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, hostToSplitCount, - rackToSplitCount) + rackToSplitCount, + sparkConf) } def newAllocator( @@ -605,7 +610,8 @@ object YarnAllocationHandler { maxWorkers: Int, workerMemory: Int, workerCores: Int, - map: collection.Map[String, collection.Set[SplitInfo]] + map: collection.Map[String, collection.Set[SplitInfo]], + sparkConf: SparkConf ): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) new YarnAllocationHandler( @@ -616,7 +622,8 @@ object YarnAllocationHandler { workerMemory, workerCores, hostToCount, - rackToCount) + rackToCount, + sparkConf) } // A simple method to copy the split info map. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index dc9228180f60a1b1564a9ec1d13fa6fc28a6a26c..7cf120d3eb8a47a966f1263610b2678262f85652 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -36,10 +36,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils - class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { def this(args: ApplicationMasterArguments) = this(args, new Configuration()) @@ -57,14 +56,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - // default to numWorkers * 2, with minimum of 3 - private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures", - math.max(args.numWorkers * 2, 3).toString()).toInt + + private val sparkConf = new SparkConf() + // Default to numWorkers * 2, with minimum of 3 + private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", + math.max(args.numWorkers * 2, 3)) def run() { // Setup the directories so things go to yarn approved directories rather // then user specified and /tmp. - conf.set("spark.local.dir", getLocalDirs()) + System.setProperty("spark.local.dir", getLocalDirs()) + + // set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) @@ -99,8 +104,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. - waitForSparkMaster() - waitForSparkContextInitialized() // Do this after spark master is up and SparkContext is created so that we can register UI Url @@ -161,30 +164,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager.registerApplicationMaster(appMasterRequest) } - private def waitForSparkMaster() { - logInfo("Waiting for spark driver to be reachable.") - var driverUp = false - var tries = 0 - val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt - while(!driverUp && tries < numTries) { - val driverHost = conf.get("spark.driver.host") - val driverPort = conf.get("spark.driver.port") - try { - val socket = new Socket(driverHost, driverPort.toInt) - socket.close() - logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) - driverUp = true - } catch { - case e: Exception => { - logWarning("Failed to connect to driver at %s:%s, retrying ...". - format(driverHost, driverPort)) - Thread.sleep(100) - tries = tries + 1 - } - } - } - } - private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( @@ -226,7 +205,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ApplicationMaster.sparkContextRef.synchronized { var count = 0 val waitTime = 10000L - val numTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt + val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10) while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 @@ -242,7 +221,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager, appAttemptId, args, - sparkContext.preferredNodeLocationData) + sparkContext.preferredNodeLocationData, + sparkContext.getConf) } else { logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d". format(count * waitTime, numTries)) @@ -250,7 +230,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e yarnConf, resourceManager, appAttemptId, - args) + args, + sparkContext.getConf) } } } finally { @@ -294,7 +275,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = - conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) @@ -377,7 +358,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private def cleanupStagingDir() { var stagingDirPath: Path = null try { - val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) { stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 595a7ee8c3d83c329e07268a05675d4aac646812..2bd047c97adbab917c069c83dc2abc9844e421c6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil @@ -54,6 +54,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() + private val sparkConf = new SparkConf // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short) @@ -230,7 +231,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort + val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) @@ -422,7 +423,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } def monitorApplication(appId: ApplicationId): Boolean = { - val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong + val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) while (true) { Thread.sleep(interval) @@ -485,8 +486,7 @@ object Client { Path.SEPARATOR + LOG4J_PROP) } // Normally the users app.jar is last in case conflicts with spark jars - val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false") - .toBoolean + val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean if (userClasspathFirst) { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + APP_JAR) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index e9e46a193b145deae0194228e51afd1128ee5056..9075ca71e7efc38fa83068e91ffa78db400b4ec7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import org.apache.spark.SparkConf import org.apache.spark.util.MemoryParam import org.apache.spark.util.IntParam import collection.mutable.{ArrayBuffer, HashMap} @@ -33,7 +34,7 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 var workerCores = 1 var numWorkers = 2 - var amQueue = conf.getOrElse("QUEUE", "default") + var amQueue = new SparkConf().get("QUEUE", "default") var amMemory: Int = 512 var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster" var appName: String = "Spark" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index c1e79cbe665a5835aa935960b777f3975da8f194..a8de89c67081187843d7af7dfa708dd6afa997e3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -46,8 +46,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private var yarnAllocator: YarnAllocationHandler = null private var driverClosed:Boolean = false + private val sparkConf = new SparkConf - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1 + val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = sparkConf)._1 var actor: ActorRef = null // This actor just working as a monitor to watch on Driver Actor. @@ -162,8 +164,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte Thread.sleep(100) } } - conf.set("spark.driver.host", driverHost) - conf.set("spark.driver.port", driverPort.toString) + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -175,9 +177,11 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte private def allocateWorkers() { // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. - val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map() + val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = + scala.collection.immutable.Map() - yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, preferredNodeLocationData) + yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, + args, preferredNodeLocationData, sparkConf) logInfo("Allocating " + args.numWorkers + " workers.") // Wait until all containers have finished diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 5966a0f7577572e277277f56c41928b4ce93d59b..c8af653b3ffab9191b586a015902b616e1952eff 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -26,7 +26,7 @@ import scala.collection import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -62,7 +62,8 @@ private[yarn] class YarnAllocationHandler( val workerMemory: Int, val workerCores: Int, val preferredHostToCount: Map[String, Int], - val preferredRackToCount: Map[String, Int]) + val preferredRackToCount: Map[String, Int], + val sparkConf: SparkConf) extends Logging { // These three are locked on allocatedHostToContainersMap. Complementary data structures // allocatedHostToContainersMap : containers which are running : host, Set<containerid> @@ -239,7 +240,7 @@ private[yarn] class YarnAllocationHandler( // (workerIdCounter) val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( - conf.get("spark.driver.host"), conf.get("spark.driver.port"), + sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) @@ -552,7 +553,8 @@ object YarnAllocationHandler { conf: Configuration, resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments): YarnAllocationHandler = { + args: ApplicationMasterArguments, + sparkConf: SparkConf): YarnAllocationHandler = { new YarnAllocationHandler( conf, @@ -562,7 +564,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, Map[String, Int](), - Map[String, Int]()) + Map[String, Int](), + sparkConf) } def newAllocator( @@ -571,7 +574,8 @@ object YarnAllocationHandler { appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, map: collection.Map[String, - collection.Set[SplitInfo]]): YarnAllocationHandler = { + collection.Set[SplitInfo]], + sparkConf: SparkConf): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) new YarnAllocationHandler( @@ -582,7 +586,8 @@ object YarnAllocationHandler { args.workerMemory, args.workerCores, hostToCount, - rackToCount) + rackToCount, + sparkConf) } def newAllocator( @@ -592,7 +597,8 @@ object YarnAllocationHandler { maxWorkers: Int, workerMemory: Int, workerCores: Int, - map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = { + map: collection.Map[String, collection.Set[SplitInfo]], + sparkConf: SparkConf): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) @@ -604,7 +610,8 @@ object YarnAllocationHandler { workerMemory, workerCores, hostToCount, - rackToCount) + rackToCount, + sparkConf) } // A simple method to copy the split info map.