Skip to content
Snippets Groups Projects
Commit 557cfd0f authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #515 from woggling/deploy-app-death

Notify standalone deploy client of application death.
parents a59cc606 b0983c57
No related branches found
No related tags found
No related merge requests found
...@@ -65,7 +65,7 @@ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String ...@@ -65,7 +65,7 @@ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String
exitStatus: Option[Int]) exitStatus: Option[Int])
private[spark] private[spark]
case class appKilled(message: String) case class ApplicationRemoved(message: String)
// Internal message in Client // Internal message in Client
......
...@@ -54,6 +54,11 @@ private[spark] class Client( ...@@ -54,6 +54,11 @@ private[spark] class Client(
appId = appId_ appId = appId_
listener.connected(appId) listener.connected(appId)
case ApplicationRemoved(message) =>
logError("Master removed our application: %s; stopping client".format(message))
markDisconnected()
context.stop(self)
case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) => case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) =>
val fullId = appId + "/" + id val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores)) logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
......
...@@ -107,7 +107,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -107,7 +107,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
} else { } else {
logError("Application %s with ID %s failed %d times, removing it".format( logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount)) appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo) removeApplication(appInfo, ApplicationState.FAILED)
} }
} }
} }
...@@ -129,19 +129,19 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -129,19 +129,19 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// The disconnected actor could've been either a worker or an app; remove whichever of // The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap // those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker) actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(removeApplication) actorToApp.get(actor).foreach(finishApplication)
} }
case RemoteClientDisconnected(transport, address) => { case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was // The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker) addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(removeApplication) addressToApp.get(address).foreach(finishApplication)
} }
case RemoteClientShutdown(transport, address) => { case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was // The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker) addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(removeApplication) addressToApp.get(address).foreach(finishApplication)
} }
case RequestMasterState => { case RequestMasterState => {
...@@ -257,7 +257,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -257,7 +257,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
return app return app
} }
def removeApplication(app: ApplicationInfo) { def finishApplication(app: ApplicationInfo) {
removeApplication(app, ApplicationState.FINISHED)
}
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
if (apps.contains(app)) { if (apps.contains(app)) {
logInfo("Removing app " + app.id) logInfo("Removing app " + app.id)
apps -= app apps -= app
...@@ -270,7 +274,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -270,7 +274,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.worker.removeExecutor(exec) exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(exec.application.id, exec.id) exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
} }
app.markFinished(ApplicationState.FINISHED) // TODO: Mark it as FAILED if it failed app.markFinished(state)
app.driver ! ApplicationRemoved(state.toString)
schedule() schedule()
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment