diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index f05413a53bf58d616634ef6f32e2073b56c5ced6..457122745b61847739080681da0aa429dadc0a10 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -27,7 +27,8 @@ case class ExecutorStateChanged( jobId: String, execId: Int, state: ExecutorState, - message: Option[String]) + message: Option[String], + exitStatus: Option[Int]) extends DeployMessage // Master to Worker @@ -58,7 +59,8 @@ private[spark] case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) private[spark] -case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String]) +case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], + exitStatus: Option[Int]) private[spark] case class JobKilled(message: String) diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index c57a1d33e91f282302876a637f69cbada39597c2..90fe9508cdba9b5df0b438ee4e91447915c2c6fd 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -66,12 +66,12 @@ private[spark] class Client( logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores)) listener.executorAdded(fullId, workerId, host, cores, memory) - case ExecutorUpdated(id, state, message) => + case ExecutorUpdated(id, state, message, exitStatus) => val fullId = jobId + "/" + id val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { - listener.executorRemoved(fullId, message.getOrElse("")) + listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) } case Terminated(actor_) if actor_ == master => diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala index a8fa982085dd25eae4a24f00ea8c37c47857d155..da6abcc9c26c083482eaaac5f002e2151d4803b1 100644 --- a/core/src/main/scala/spark/deploy/client/ClientListener.scala +++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala @@ -14,5 +14,5 @@ private[spark] trait ClientListener { def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit - def executorRemoved(id: String, message: String): Unit + def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit } diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala index 5b710f5520a8658c4e4fea1e05cd3a5a62e275d0..57a7e123b78a5449802469eb9bab7bb21753020d 100644 --- a/core/src/main/scala/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/spark/deploy/client/TestClient.scala @@ -18,7 +18,7 @@ private[spark] object TestClient { def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {} - def executorRemoved(id: String, message: String) {} + def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {} } def main(args: Array[String]) { diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 31fb83f2e21ab6dba65c9de57dd6c28bbc48b1ea..b30c8e99b560e66de457605056e15cf01cfdcc0b 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -83,12 +83,12 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor schedule() } - case ExecutorStateChanged(jobId, execId, state, message) => { + case ExecutorStateChanged(jobId, execId, state, message, exitStatus) => { val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId)) execOption match { case Some(exec) => { exec.state = state - exec.job.actor ! ExecutorUpdated(execId, state, message) + exec.job.actor ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { val jobInfo = idToJob(jobId) // Remove this executor from the worker and job @@ -218,7 +218,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor actorToWorker -= worker.actor addressToWorker -= worker.actor.path.address for (exec <- worker.executors.values) { - exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None) + exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None) exec.job.executors -= exec.id } } diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 07ae7bca78e9b582fe7892af194866ae2824759a..beceb55ecdf54750016af497b0689b5a37191a67 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -60,7 +60,7 @@ private[spark] class ExecutorRunner( process.destroy() process.waitFor() } - worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None) + worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None, None) Runtime.getRuntime.removeShutdownHook(shutdownHook) } } @@ -134,7 +134,8 @@ private[spark] class ExecutorRunner( // times on the same machine. val exitCode = process.waitFor() val message = "Command exited with code " + exitCode - worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message)) + worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message), + Some(exitCode)) } catch { case interrupted: InterruptedException => logInfo("Runner thread for executor " + fullId + " interrupted") @@ -145,7 +146,7 @@ private[spark] class ExecutorRunner( process.destroy() } val message = e.getClass + ": " + e.getMessage - worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message)) + worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message), None) } } } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 31b8f0f9554c42805dd2f71446b217bd52cbf196..7c9e588ea2d3277bb85a800c429fdc6179da1727 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -127,10 +127,10 @@ private[spark] class Worker( manager.start() coresUsed += cores_ memoryUsed += memory_ - master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None) + master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None, None) - case ExecutorStateChanged(jobId, execId, state, message) => - master ! ExecutorStateChanged(jobId, execId, state, message) + case ExecutorStateChanged(jobId, execId, state, message, exitStatus) => + master ! ExecutorStateChanged(jobId, execId, state, message, exitStatus) val fullId = jobId + "/" + execId if (ExecutorState.isFinished(state)) { val executor = executors(fullId) diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index efaf2d330c4fa25f846df304653e6ed7b2e38932..e2301347e510340b25c4d10731dc6db7987048c0 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -71,14 +71,10 @@ private[spark] class SparkDeploySchedulerBackend( id, host, cores, Utils.memoryMegabytesToString(memory))) } - def executorRemoved(id: String, message: String) { - var reason: ExecutorLossReason = SlaveLost(message) - if (message.startsWith("Command exited with code ")) { - try { - reason = ExecutorExited(message.substring("Command exited with code ".length).toInt) - } catch { - case nfe: NumberFormatException => {} - } + def executorRemoved(id: String, message: String, exitStatus: Option[Int]) { + val reason: ExecutorLossReason = exitStatus match { + case Some(code) => ExecutorExited(code) + case None => SlaveLost(message) } logInfo("Executor %s removed: %s".format(id, message)) executorIdToSlaveId.get(id) match {