Skip to content
Snippets Groups Projects
Commit 34719ba3 authored by Aaron Davidson's avatar Aaron Davidson Committed by Patrick Wendell
Browse files

SPARK-1689 AppClient should indicate app is dead() when removed

Previously, we indicated disconnected(), which keeps the application in a limbo state where it has no executors but thinks it will get them soon.

This is a bug fix that hopefully can be included in 1.0.

Author: Aaron Davidson <aaron@databricks.com>

Closes #605 from aarondav/appremoved and squashes the following commits:

bea02a2 [Aaron Davidson] SPARK-1689 AppClient should indicate app is dead() when removed
parent ce72c72a
No related branches found
No related tags found
No related merge requests found
...@@ -92,8 +92,7 @@ private[spark] class AppClient( ...@@ -92,8 +92,7 @@ private[spark] class AppClient(
if (registered) { if (registered) {
retryTimer.cancel() retryTimer.cancel()
} else if (retries >= REGISTRATION_RETRIES) { } else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.") markDead("All masters are unresponsive! Giving up.")
markDead()
} else { } else {
tryRegisterAllMasters() tryRegisterAllMasters()
} }
...@@ -126,8 +125,7 @@ private[spark] class AppClient( ...@@ -126,8 +125,7 @@ private[spark] class AppClient(
listener.connected(appId) listener.connected(appId)
case ApplicationRemoved(message) => case ApplicationRemoved(message) =>
logError("Master removed our application: %s; stopping client".format(message)) markDead("Master removed our application: %s".format(message))
markDisconnected()
context.stop(self) context.stop(self)
case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) => case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
...@@ -158,7 +156,7 @@ private[spark] class AppClient( ...@@ -158,7 +156,7 @@ private[spark] class AppClient(
logWarning(s"Could not connect to $address: $cause") logWarning(s"Could not connect to $address: $cause")
case StopAppClient => case StopAppClient =>
markDead() markDead("Application has been stopped.")
sender ! true sender ! true
context.stop(self) context.stop(self)
} }
...@@ -173,9 +171,9 @@ private[spark] class AppClient( ...@@ -173,9 +171,9 @@ private[spark] class AppClient(
} }
} }
def markDead() { def markDead(reason: String) {
if (!alreadyDead) { if (!alreadyDead) {
listener.dead() listener.dead(reason)
alreadyDead = true alreadyDead = true
} }
} }
......
...@@ -30,8 +30,8 @@ private[spark] trait AppClientListener { ...@@ -30,8 +30,8 @@ private[spark] trait AppClientListener {
/** Disconnection may be a temporary state, as we fail over to a new Master. */ /** Disconnection may be a temporary state, as we fail over to a new Master. */
def disconnected(): Unit def disconnected(): Unit
/** Dead means that we couldn't find any Masters to connect to, and have given up. */ /** An application death is an unrecoverable failure condition. */
def dead(): Unit def dead(reason: String): Unit
def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
......
...@@ -33,8 +33,8 @@ private[spark] object TestClient { ...@@ -33,8 +33,8 @@ private[spark] object TestClient {
System.exit(0) System.exit(0)
} }
def dead() { def dead(reason: String) {
logInfo("Could not connect to master") logInfo("Application died with error: " + reason)
System.exit(0) System.exit(0)
} }
......
...@@ -83,10 +83,10 @@ private[spark] class SparkDeploySchedulerBackend( ...@@ -83,10 +83,10 @@ private[spark] class SparkDeploySchedulerBackend(
} }
} }
override def dead() { override def dead(reason: String) {
if (!stopping) { if (!stopping) {
logError("Spark cluster looks dead, giving up.") logError("Application has been killed. Reason: " + reason)
scheduler.error("Spark cluster looks down") scheduler.error(reason)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment