Skip to content
Snippets Groups Projects
Commit 1072f970 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #331 from woggling/deploy-exit-status

Have standalone cluster report exit codes to clients
parents d6d91047 c528932a
No related branches found
No related tags found
No related merge requests found
...@@ -27,7 +27,8 @@ case class ExecutorStateChanged( ...@@ -27,7 +27,8 @@ case class ExecutorStateChanged(
jobId: String, jobId: String,
execId: Int, execId: Int,
state: ExecutorState, state: ExecutorState,
message: Option[String]) message: Option[String],
exitStatus: Option[Int])
extends DeployMessage extends DeployMessage
// Master to Worker // Master to Worker
...@@ -58,7 +59,8 @@ private[spark] ...@@ -58,7 +59,8 @@ private[spark]
case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
private[spark] private[spark]
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String]) case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
exitStatus: Option[Int])
private[spark] private[spark]
case class JobKilled(message: String) case class JobKilled(message: String)
......
...@@ -66,12 +66,12 @@ private[spark] class Client( ...@@ -66,12 +66,12 @@ private[spark] class Client(
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores)) logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
listener.executorAdded(fullId, workerId, host, cores, memory) listener.executorAdded(fullId, workerId, host, cores, memory)
case ExecutorUpdated(id, state, message) => case ExecutorUpdated(id, state, message, exitStatus) =>
val fullId = jobId + "/" + id val fullId = jobId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("") val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) { if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse("")) listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
} }
case Terminated(actor_) if actor_ == master => case Terminated(actor_) if actor_ == master =>
......
...@@ -14,5 +14,5 @@ private[spark] trait ClientListener { ...@@ -14,5 +14,5 @@ private[spark] trait ClientListener {
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit
def executorRemoved(id: String, message: String): Unit def executorRemoved(id: String, message: String, exitStatus: Option[Int]): Unit
} }
...@@ -18,7 +18,7 @@ private[spark] object TestClient { ...@@ -18,7 +18,7 @@ private[spark] object TestClient {
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {} def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {}
def executorRemoved(id: String, message: String) {} def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {}
} }
def main(args: Array[String]) { def main(args: Array[String]) {
......
...@@ -83,12 +83,12 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -83,12 +83,12 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
schedule() schedule()
} }
case ExecutorStateChanged(jobId, execId, state, message) => { case ExecutorStateChanged(jobId, execId, state, message, exitStatus) => {
val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId)) val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId))
execOption match { execOption match {
case Some(exec) => { case Some(exec) => {
exec.state = state exec.state = state
exec.job.actor ! ExecutorUpdated(execId, state, message) exec.job.actor ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) { if (ExecutorState.isFinished(state)) {
val jobInfo = idToJob(jobId) val jobInfo = idToJob(jobId)
// Remove this executor from the worker and job // Remove this executor from the worker and job
...@@ -218,7 +218,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -218,7 +218,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
actorToWorker -= worker.actor actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) { for (exec <- worker.executors.values) {
exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None) exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
exec.job.executors -= exec.id exec.job.executors -= exec.id
} }
} }
......
...@@ -60,7 +60,7 @@ private[spark] class ExecutorRunner( ...@@ -60,7 +60,7 @@ private[spark] class ExecutorRunner(
process.destroy() process.destroy()
process.waitFor() process.waitFor()
} }
worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None) worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook) Runtime.getRuntime.removeShutdownHook(shutdownHook)
} }
} }
...@@ -134,7 +134,8 @@ private[spark] class ExecutorRunner( ...@@ -134,7 +134,8 @@ private[spark] class ExecutorRunner(
// times on the same machine. // times on the same machine.
val exitCode = process.waitFor() val exitCode = process.waitFor()
val message = "Command exited with code " + exitCode val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message)) worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message),
Some(exitCode))
} catch { } catch {
case interrupted: InterruptedException => case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted") logInfo("Runner thread for executor " + fullId + " interrupted")
...@@ -145,7 +146,7 @@ private[spark] class ExecutorRunner( ...@@ -145,7 +146,7 @@ private[spark] class ExecutorRunner(
process.destroy() process.destroy()
} }
val message = e.getClass + ": " + e.getMessage val message = e.getClass + ": " + e.getMessage
worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message)) worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message), None)
} }
} }
} }
......
...@@ -127,10 +127,10 @@ private[spark] class Worker( ...@@ -127,10 +127,10 @@ private[spark] class Worker(
manager.start() manager.start()
coresUsed += cores_ coresUsed += cores_
memoryUsed += memory_ memoryUsed += memory_
master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None) master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None, None)
case ExecutorStateChanged(jobId, execId, state, message) => case ExecutorStateChanged(jobId, execId, state, message, exitStatus) =>
master ! ExecutorStateChanged(jobId, execId, state, message) master ! ExecutorStateChanged(jobId, execId, state, message, exitStatus)
val fullId = jobId + "/" + execId val fullId = jobId + "/" + execId
if (ExecutorState.isFinished(state)) { if (ExecutorState.isFinished(state)) {
val executor = executors(fullId) val executor = executors(fullId)
......
...@@ -71,14 +71,10 @@ private[spark] class SparkDeploySchedulerBackend( ...@@ -71,14 +71,10 @@ private[spark] class SparkDeploySchedulerBackend(
id, host, cores, Utils.memoryMegabytesToString(memory))) id, host, cores, Utils.memoryMegabytesToString(memory)))
} }
def executorRemoved(id: String, message: String) { def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {
var reason: ExecutorLossReason = SlaveLost(message) val reason: ExecutorLossReason = exitStatus match {
if (message.startsWith("Command exited with code ")) { case Some(code) => ExecutorExited(code)
try { case None => SlaveLost(message)
reason = ExecutorExited(message.substring("Command exited with code ".length).toInt)
} catch {
case nfe: NumberFormatException => {}
}
} }
logInfo("Executor %s removed: %s".format(id, message)) logInfo("Executor %s removed: %s".format(id, message))
executorIdToSlaveId.get(id) match { executorIdToSlaveId.get(id) match {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment