From 0aad42b5e732ac6865b8e3c2cffa35d4ff48d5ca Mon Sep 17 00:00:00 2001
From: Charles Reiss <charles@eecs.berkeley.edu>
Date: Thu, 13 Dec 2012 20:33:57 -0800
Subject: [PATCH] Have standalone cluster report exit codes to clients.
 Addresses SPARK-639.

---
 core/src/main/scala/spark/deploy/DeployMessage.scala  |  6 ++++--
 core/src/main/scala/spark/deploy/client/Client.scala  |  4 ++--
 .../scala/spark/deploy/client/ClientListener.scala    |  2 +-
 .../main/scala/spark/deploy/client/TestClient.scala   |  2 +-
 core/src/main/scala/spark/deploy/master/Master.scala  |  6 +++---
 .../scala/spark/deploy/worker/ExecutorRunner.scala    |  7 ++++---
 core/src/main/scala/spark/deploy/worker/Worker.scala  |  6 +++---
 .../cluster/SparkDeploySchedulerBackend.scala         | 11 ++---------
 8 files changed, 20 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index f05413a53b..457122745b 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -27,7 +27,8 @@ case class ExecutorStateChanged(
     jobId: String,
     execId: Int,
     state: ExecutorState,
-    message: Option[String])
+    message: Option[String],
+    exitStatus: Option[Int])
   extends DeployMessage
 
 // Master to Worker
@@ -58,7 +59,8 @@ private[spark]
 case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
 
 private[spark]
-case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String])
+case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
+                           exitStatus: Option[Int])
 
 private[spark]
 case class JobKilled(message: String)
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index c57a1d33e9..90fe9508cd 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -66,12 +66,12 @@ private[spark] class Client(
         logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
         listener.executorAdded(fullId, workerId, host, cores, memory)
 
-      case ExecutorUpdated(id, state, message) =>
+      case ExecutorUpdated(id, state, message, exitStatus) =>
         val fullId = jobId + "/" + id
         val messageText = message.map(s => " (" + s + ")").getOrElse("")
         logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
         if (ExecutorState.isFinished(state)) {
-          listener.executorRemoved(fullId, message.getOrElse(""))
+          listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
         }
 
       case Terminated(actor_) if actor_ == master =>
diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala
index a8fa982085..da6abcc9c2 100644
--- a/core/src/main/scala/spark/deploy/client/ClientListener.scala
+++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala
@@ -14,5 +14,5 @@ private[spark] trait ClientListener {
 
   def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit
 
-  def executorRemoved(id: String, message: String): Unit
+  def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit
 }
diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala
index 5b710f5520..57a7e123b7 100644
--- a/core/src/main/scala/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/spark/deploy/client/TestClient.scala
@@ -18,7 +18,7 @@ private[spark] object TestClient {
 
     def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {}
 
-    def executorRemoved(id: String, message: String) {}
+    def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {}
   }
 
   def main(args: Array[String]) {
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 31fb83f2e2..b30c8e99b5 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -83,12 +83,12 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
       schedule()
     }
 
-    case ExecutorStateChanged(jobId, execId, state, message) => {
+    case ExecutorStateChanged(jobId, execId, state, message, exitStatus) => {
       val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId))
       execOption match {
         case Some(exec) => {
           exec.state = state
-          exec.job.actor ! ExecutorUpdated(execId, state, message)
+          exec.job.actor ! ExecutorUpdated(execId, state, message, exitStatus)
           if (ExecutorState.isFinished(state)) {
             val jobInfo = idToJob(jobId)
             // Remove this executor from the worker and job
@@ -218,7 +218,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
     actorToWorker -= worker.actor
     addressToWorker -= worker.actor.path.address
     for (exec <- worker.executors.values) {
-      exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None)
+      exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
       exec.job.executors -= exec.id
     }
   }
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 07ae7bca78..beceb55ecd 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -60,7 +60,7 @@ private[spark] class ExecutorRunner(
         process.destroy()
         process.waitFor()
       }
-      worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None)
+      worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None, None)
       Runtime.getRuntime.removeShutdownHook(shutdownHook)
     }
   }
@@ -134,7 +134,8 @@ private[spark] class ExecutorRunner(
       // times on the same machine.
       val exitCode = process.waitFor()
       val message = "Command exited with code " + exitCode
-      worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message))
+      worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message),
+                                    Some(exitCode))
     } catch {
       case interrupted: InterruptedException =>
         logInfo("Runner thread for executor " + fullId + " interrupted")
@@ -145,7 +146,7 @@ private[spark] class ExecutorRunner(
           process.destroy()
         }
         val message = e.getClass + ": " + e.getMessage
-        worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message))
+        worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message), None)
       }
     }
   }
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 31b8f0f955..7c9e588ea2 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -127,10 +127,10 @@ private[spark] class Worker(
       manager.start()
       coresUsed += cores_
       memoryUsed += memory_
-      master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None)
+      master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None, None)
 
-    case ExecutorStateChanged(jobId, execId, state, message) =>
-      master ! ExecutorStateChanged(jobId, execId, state, message)
+    case ExecutorStateChanged(jobId, execId, state, message, exitStatus) =>
+      master ! ExecutorStateChanged(jobId, execId, state, message, exitStatus)
       val fullId = jobId + "/" + execId
       if (ExecutorState.isFinished(state)) {
         val executor = executors(fullId)
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index efaf2d330c..7b58d0c022 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -71,15 +71,8 @@ private[spark] class SparkDeploySchedulerBackend(
        id, host, cores, Utils.memoryMegabytesToString(memory)))
   }
 
-  def executorRemoved(id: String, message: String) {
-    var reason: ExecutorLossReason = SlaveLost(message)
-    if (message.startsWith("Command exited with code ")) {
-      try {
-        reason = ExecutorExited(message.substring("Command exited with code ".length).toInt)
-      } catch {
-        case nfe: NumberFormatException => {}
-      }
-    }
+  def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {
+    var reason: ExecutorLossReason = exitStatus.map(ExecutorExited).getOrElse(SlaveLost(message))
     logInfo("Executor %s removed: %s".format(id, message))
     executorIdToSlaveId.get(id) match {
       case Some(slaveId) => 
-- 
GitLab