From b27b75f1c595139bdcebbadb43e89b0a7eadf2b5 Mon Sep 17 00:00:00 2001 From: liguoqiang <liguoqiang@rd.tuan800.com> Date: Fri, 3 Jan 2014 15:34:24 +0800 Subject: [PATCH] Modify spark on yarn to create SparkConf process --- .../spark/deploy/yarn/ApplicationMaster.scala | 20 ++++++++++------- .../org/apache/spark/deploy/yarn/Client.scala | 10 +++++---- .../spark/deploy/yarn/WorkerLauncher.scala | 20 +++++++++-------- .../spark/deploy/yarn/ApplicationMaster.scala | 21 +++++++++++------- .../org/apache/spark/deploy/yarn/Client.scala | 18 ++++++++------- .../spark/deploy/yarn/ClientArguments.scala | 2 +- .../spark/deploy/yarn/WorkerLauncher.scala | 22 ++++++++++--------- 7 files changed, 65 insertions(+), 48 deletions(-) diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 609e4e47e3..69ae14ce83 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -42,9 +42,11 @@ import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils -class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { +class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, + sparkConf: SparkConf) extends Logging { - def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) @@ -115,7 +117,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .getOrElse(Option(System.getenv("LOCAL_DIRS")) - .getOrElse("")) + .getOrElse("")) if (localDirs.isEmpty()) { throw new Exception("Yarn Local dirs can't be empty") @@ -137,11 +139,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) } - private def startUserClass(): Thread = { + private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( args.userClass, - false /* initialize */, + false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { @@ -257,7 +259,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s } private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime + val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { @@ -316,7 +318,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s logInfo("finishApplicationMaster with " + status) // Set tracking URL to empty since we don't have a history server. - amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */) + amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) } /** @@ -351,6 +353,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() } } + } object ApplicationMaster { @@ -401,6 +404,7 @@ object ApplicationMaster { // This is not only logs, but also ensures that log system is initialized for this instance // when we are actually 'run'-ing. logInfo("Adding shutdown hook for context " + sc) + override def run() { logInfo("Invoking sc stop from shutdown hook") sc.stop() @@ -409,7 +413,7 @@ object ApplicationMaster { master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } } - } ) + }) } // Wait for initialization to complete and atleast 'some' nodes can get allocated. diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 952171cd0a..440ad5cde5 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -50,9 +50,11 @@ import org.apache.spark.deploy.SparkHadoopUtil * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster, * which will launch a Spark master process and negotiate resources throughout its duration. */ -class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging { +class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) + extends YarnClientImpl with Logging { - def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ClientArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ClientArguments) = this(args, new SparkConf()) @@ -143,7 +145,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e clusterMetrics.getNumNodeManagers) val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue) - logInfo("""Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s, + logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s, queueApplicationCount = %s, queueChildQueueCount = %s""".format( queueInfo.getQueueName, queueInfo.getCurrentCapacity, @@ -347,7 +349,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e val prefix = " --args " val args = clientArgs.userArgs val retval = new StringBuilder() - for (arg <- args){ + for (arg <- args) { retval.append(prefix).append(" '").append(arg).append("' ") } retval.toString diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 09ac8d77ca..e4c6ab212c 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -35,9 +35,11 @@ import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { +class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) + extends Logging { - def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) @@ -50,7 +52,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar private var amClient: AMRMClient[ContainerRequest] = _ - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, conf = sparkConf)._1 var actor: ActorRef = _ @@ -93,7 +95,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // must be <= timeoutInterval/ 2. // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L)) reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -139,8 +141,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar Thread.sleep(100) } } - sparkConf.set("spark.driver.host", driverHost) - sparkConf.set("spark.driver.port", driverPort.toString) + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -169,7 +171,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // TODO: Handle container failure yarnAllocator.addResourceRequests(args.numWorkers) - while(yarnAllocator.getNumWorkersRunning < args.numWorkers) { + while (yarnAllocator.getNumWorkersRunning < args.numWorkers) { yarnAllocator.allocateResources() Thread.sleep(100) } @@ -180,7 +182,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // TODO: We might want to extend this to allocate more containers in case they die ! private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime + val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { @@ -212,7 +214,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar def finishApplicationMaster(status: FinalApplicationStatus) { logInfo("finish ApplicationMaster with " + status) - amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */) + amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 69170c7427..2bb11e54c5 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -39,9 +39,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.util.Utils -class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { +class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, + sparkConf: SparkConf) extends Logging { - def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) @@ -126,7 +128,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .getOrElse(Option(System.getenv("LOCAL_DIRS")) - .getOrElse("")) + .getOrElse("")) if (localDirs.isEmpty()) { throw new Exception("Yarn Local dirs can't be empty") @@ -165,11 +167,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s resourceManager.registerApplicationMaster(appMasterRequest) } - private def startUserClass(): Thread = { + private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName( args.userClass, - false /* initialize */, + false /* initialize */ , Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { @@ -231,7 +233,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s yarnConf, resourceManager, appAttemptId, - args, + args, sparkContext.getConf) } } @@ -286,7 +288,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s } private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime + val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { @@ -385,6 +387,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() } } + } object ApplicationMaster { @@ -394,6 +397,7 @@ object ApplicationMaster { // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. private val ALLOCATOR_LOOP_WAIT_COUNT = 30 + def incrementAllocatorLoop(by: Int) { val count = yarnAllocatorLoop.getAndAdd(by) if (count >= ALLOCATOR_LOOP_WAIT_COUNT) { @@ -432,6 +436,7 @@ object ApplicationMaster { // This is not only logs, but also ensures that log system is initialized for this instance // when we are actually 'run'-ing. logInfo("Adding shutdown hook for context " + sc) + override def run() { logInfo("Invoking sc stop from shutdown hook") sc.stop() @@ -440,7 +445,7 @@ object ApplicationMaster { master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } } - } ) + }) } // Wait for initialization to complete and atleast 'some' nodes can get allocated. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 525ea72762..6abb4d5017 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -45,9 +45,11 @@ import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil -class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging { +class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) + extends YarnClientImpl with Logging { - def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ClientArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ClientArguments) = this(args, new SparkConf()) @@ -123,7 +125,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e clusterMetrics.getNumNodeManagers) val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue) - logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s, + logInfo( """Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s, queueApplicationCount = %s, queueChildQueueCount = %s""".format( queueInfo.getQueueName, queueInfo.getCurrentCapacity, @@ -143,7 +145,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD if (amMem > maxMem) { - logError("AM size is to large to run on this cluster " + amMem) + logError("AM size is to large to run on this cluster " + amMem) System.exit(1) } @@ -328,7 +330,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e val prefix = " --args " val args = clientArgs.userArgs val retval = new StringBuilder() - for (arg <- args){ + for (arg <- args) { retval.append(prefix).append(" '").append(arg).append("' ") } retval.toString @@ -467,10 +469,10 @@ object Client { // Note that anything with SPARK prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") - val sparkConf = new SparkConf - val args = new ClientArguments(argStrings,sparkConf) + val sparkConf = new SparkConf + val args = new ClientArguments(argStrings, sparkConf) - new Client(args,sparkConf).run + new Client(args, sparkConf).run } // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 09303ae5c2..8254d628fb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -24,7 +24,7 @@ import collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware ! -class ClientArguments(val args: Array[String],val sparkConf: SparkConf) { +class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { var addJars: String = null var files: String = null var archives: String = null diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 1a792ddf66..300e78612e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -34,9 +34,11 @@ import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { +class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) + extends Logging { - def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) @@ -47,9 +49,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var yarnAllocator: YarnAllocationHandler = null - private var driverClosed:Boolean = false + private var driverClosed: Boolean = false - val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, conf = sparkConf)._1 var actor: ActorRef = null @@ -83,7 +85,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar if (minimumMemory > 0) { val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD - val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) + val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) if (numCore > 0) { // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 @@ -104,7 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // must be <= timeoutInterval/ 2. // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L)) reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -165,8 +167,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar Thread.sleep(100) } } - sparkConf.set("spark.driver.host", driverHost) - sparkConf.set("spark.driver.port", driverPort.toString) + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -188,7 +190,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure - while(yarnAllocator.getNumWorkersRunning < args.numWorkers) { + while (yarnAllocator.getNumWorkersRunning < args.numWorkers) { yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) Thread.sleep(100) } @@ -199,7 +201,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // TODO: We might want to extend this to allocate more containers in case they die ! private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime + val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { -- GitLab