Skip to content
Snippets Groups Projects
Commit b0983c57 authored by Charles Reiss's avatar Charles Reiss
Browse files

Notify standalone deploy client of application death.

Usually, this isn't necessary since the application will be removed
as a result of the deploy client disconnecting, but occassionally, the
standalone deploy master removes an application otherwise.

Also mark applications as FAILED instead of FINISHED when they are
killed as a result of their executors failing too many times.
parent 9f0dc829
No related branches found
No related tags found
No related merge requests found
...@@ -65,7 +65,7 @@ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String ...@@ -65,7 +65,7 @@ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String
exitStatus: Option[Int]) exitStatus: Option[Int])
private[spark] private[spark]
case class appKilled(message: String) case class ApplicationRemoved(message: String)
// Internal message in Client // Internal message in Client
......
...@@ -54,6 +54,11 @@ private[spark] class Client( ...@@ -54,6 +54,11 @@ private[spark] class Client(
appId = appId_ appId = appId_
listener.connected(appId) listener.connected(appId)
case ApplicationRemoved(message) =>
logError("Master removed our application: %s; stopping client".format(message))
markDisconnected()
context.stop(self)
case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) => case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) =>
val fullId = appId + "/" + id val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores)) logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
......
...@@ -107,7 +107,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -107,7 +107,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
} else { } else {
logError("Application %s with ID %s failed %d times, removing it".format( logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount)) appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo) removeApplication(appInfo, ApplicationState.FAILED)
} }
} }
} }
...@@ -129,19 +129,19 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -129,19 +129,19 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// The disconnected actor could've been either a worker or an app; remove whichever of // The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap // those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker) actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(removeApplication) actorToApp.get(actor).foreach(finishApplication)
} }
case RemoteClientDisconnected(transport, address) => { case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was // The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker) addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(removeApplication) addressToApp.get(address).foreach(finishApplication)
} }
case RemoteClientShutdown(transport, address) => { case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was // The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker) addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(removeApplication) addressToApp.get(address).foreach(finishApplication)
} }
case RequestMasterState => { case RequestMasterState => {
...@@ -257,7 +257,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -257,7 +257,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
return app return app
} }
def removeApplication(app: ApplicationInfo) { def finishApplication(app: ApplicationInfo) {
removeApplication(app, ApplicationState.FINISHED)
}
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
if (apps.contains(app)) { if (apps.contains(app)) {
logInfo("Removing app " + app.id) logInfo("Removing app " + app.id)
apps -= app apps -= app
...@@ -270,7 +274,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -270,7 +274,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.worker.removeExecutor(exec) exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(exec.application.id, exec.id) exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
} }
app.markFinished(ApplicationState.FINISHED) // TODO: Mark it as FAILED if it failed app.markFinished(state)
app.driver ! ApplicationRemoved(state.toString)
schedule() schedule()
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment