From 34719ba32ed421701eaa08bd47ce953cd9267ad7 Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Sat, 3 May 2014 13:27:10 -0700
Subject: [PATCH] SPARK-1689 AppClient should indicate app is dead() when
 removed

Previously, we indicated disconnected(), which keeps the application in a limbo state where it has no executors but thinks it will get them soon.

This is a bug fix that hopefully can be included in 1.0.

Author: Aaron Davidson <aaron@databricks.com>

Closes #605 from aarondav/appremoved and squashes the following commits:

bea02a2 [Aaron Davidson] SPARK-1689 AppClient should indicate app is dead() when removed
---
 .../org/apache/spark/deploy/client/AppClient.scala   | 12 +++++-------
 .../spark/deploy/client/AppClientListener.scala      |  4 ++--
 .../org/apache/spark/deploy/client/TestClient.scala  |  4 ++--
 .../cluster/SparkDeploySchedulerBackend.scala        |  6 +++---
 4 files changed, 12 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 8901806de9..57085fc337 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -92,8 +92,7 @@ private[spark] class AppClient(
           if (registered) {
             retryTimer.cancel()
           } else if (retries >= REGISTRATION_RETRIES) {
-            logError("All masters are unresponsive! Giving up.")
-            markDead()
+            markDead("All masters are unresponsive! Giving up.")
           } else {
             tryRegisterAllMasters()
           }
@@ -126,8 +125,7 @@ private[spark] class AppClient(
         listener.connected(appId)
 
       case ApplicationRemoved(message) =>
-        logError("Master removed our application: %s; stopping client".format(message))
-        markDisconnected()
+        markDead("Master removed our application: %s".format(message))
         context.stop(self)
 
       case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
@@ -158,7 +156,7 @@ private[spark] class AppClient(
         logWarning(s"Could not connect to $address: $cause")
 
       case StopAppClient =>
-        markDead()
+        markDead("Application has been stopped.")
         sender ! true
         context.stop(self)
     }
@@ -173,9 +171,9 @@ private[spark] class AppClient(
       }
     }
 
-    def markDead() {
+    def markDead(reason: String) {
       if (!alreadyDead) {
-        listener.dead()
+        listener.dead(reason)
         alreadyDead = true
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
index 1f20aa3dfa..e584952a9a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
@@ -30,8 +30,8 @@ private[spark] trait AppClientListener {
   /** Disconnection may be a temporary state, as we fail over to a new Master. */
   def disconnected(): Unit
 
-  /** Dead means that we couldn't find any Masters to connect to, and have given up. */
-  def dead(): Unit
+  /** An application death is an unrecoverable failure condition. */
+  def dead(reason: String): Unit
 
   def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 888dd45e93..e15a87bd38 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -33,8 +33,8 @@ private[spark] object TestClient {
       System.exit(0)
     }
 
-    def dead() {
-      logInfo("Could not connect to master")
+    def dead(reason: String) {
+      logInfo("Application died with error: " + reason)
       System.exit(0)
     }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9544ca05dc..cefa417299 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -83,10 +83,10 @@ private[spark] class SparkDeploySchedulerBackend(
     }
   }
 
-  override def dead() {
+  override def dead(reason: String) {
     if (!stopping) {
-      logError("Spark cluster looks dead, giving up.")
-      scheduler.error("Spark cluster looks down")
+      logError("Application has been killed. Reason: " + reason)
+      scheduler.error(reason)
     }
   }
 
-- 
GitLab