diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index e38459b883b66b7fd0d7e7dbee05cbeac3645b16..449b953530ff99a7862dc6fbd2a825153d0a75be 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -22,7 +22,7 @@ private[spark] class ApplicationDescription( val maxCores: Option[Int], val memoryPerSlave: Int, val command: Command, - val sparkHome: String, + val sparkHome: Option[String], val appUiUrl: String) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 5e824e1a678b647c1ef95387ae8674736e6565cd..83ce14a0a806a23affacc7e5b4a5868631d3fef6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -79,8 +79,7 @@ private[deploy] object DeployMessages { execId: Int, appDesc: ApplicationDescription, cores: Int, - memory: Int, - sparkHome: String) + memory: Int) extends DeployMessage case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index ffa909c26b64aa26c725b57cf9dfc80a7cd749fc..80179320325deec9871e12e1fed5be135ea83072 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -49,7 +49,7 @@ private[spark] object TestClient { conf = new SparkConf) val desc = new ApplicationDescription( "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), - "dummy-spark-home", "ignored") + Some("dummy-spark-home"), "ignored") val listener = new TestListener val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d9ea96afcf52a2e4719f571c1c27e5dbf91c6389..fe9770cec27ac88ae05619558e62c4896054600d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -480,7 +480,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act for (pos <- 0 until numUsable) { if (assigned(pos) > 0) { val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) - launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome) + launchExecutor(usableWorkers(pos), exec) app.state = ApplicationState.RUNNING } } @@ -493,7 +493,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse > 0) { val exec = app.addExecutor(worker, coresToUse) - launchExecutor(worker, exec, app.desc.sparkHome) + launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } @@ -502,11 +502,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } - def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { + def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, - exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome) + exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 312560d7063a44cbe006be102c3ebc66bf5a80c4..fbf2e0f30fde9d187876e82ef1f91fc463aaf2c5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -76,7 +76,7 @@ private[spark] class Worker( @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() - var sparkHome: File = null + val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) var workDir: File = null val executors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner] @@ -120,7 +120,6 @@ private[spark] class Worker( assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( host, port, cores, Utils.megabytesToString(memory))) - sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) logInfo("Spark home: " + sparkHome) createWorkDir() webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) @@ -204,22 +203,34 @@ private[spark] class Worker( System.exit(1) } - case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) => + case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { - logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) - // TODO (pwendell): We shuld make sparkHome an Option[String] in - // ApplicationDescription to be more explicit about this. - val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath) - val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING) - executors(appId + "/" + execId) = manager - manager.start() - coresUsed += cores_ - memoryUsed += memory_ - masterLock.synchronized { - master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + try { + logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, + self, workerId, host, + appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), + workDir, akkaUrl, ExecutorState.RUNNING) + executors(appId + "/" + execId) = manager + manager.start() + coresUsed += cores_ + memoryUsed += memory_ + masterLock.synchronized { + master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + } + } catch { + case e: Exception => { + logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name)) + if (executors.contains(appId + "/" + execId)) { + executors(appId + "/" + execId).kill() + executors -= appId + "/" + execId + } + masterLock.synchronized { + master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) + } + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index faa6e1ebe886f46b030f2114844a8229cee357c5..33aac52051bfcf278d1a3f3aa3191ae31138cf2e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend( val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) - val sparkHome = sc.getSparkHome().getOrElse(null) + val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome, "http://" + sc.ui.appUIAddress) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index d05bbd6ff7e6f0a84a47a3e1c10f834ef5e8b658..693b1ab237d54af8cc4e5cd8754f13a58f65706c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -74,7 +74,7 @@ class JsonProtocolSuite extends FunSuite { def createAppDesc(): ApplicationDescription = { val cmd = new Command("mainClass", List("arg1", "arg2"), Map()) - new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl") + new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl") } def createAppInfo() : ApplicationInfo = { diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index a79ee690d39ff37eb6f77100471f1ee9d76040fb..4baa65659f041e046e8c74d84f6210a59103e9d0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -26,11 +26,11 @@ import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription} class ExecutorRunnerTest extends FunSuite { test("command includes appId") { def f(s:String) = new File(s) - val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")) val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()), sparkHome, "appUiUrl") val appId = "12345-worker321-9876" - val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome), + val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")), f("ooga"), "blah", ExecutorState.RUNNING) assert(er.getCommandSeq.last === appId)