diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 8901806de92621e1f53a3e3f3b18cb535c081183..57085fc337148c5401da0da3ac9bdfab9ab686d8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -92,8 +92,7 @@ private[spark] class AppClient(
           if (registered) {
             retryTimer.cancel()
           } else if (retries >= REGISTRATION_RETRIES) {
-            logError("All masters are unresponsive! Giving up.")
-            markDead()
+            markDead("All masters are unresponsive! Giving up.")
           } else {
             tryRegisterAllMasters()
           }
@@ -126,8 +125,7 @@ private[spark] class AppClient(
         listener.connected(appId)
 
       case ApplicationRemoved(message) =>
-        logError("Master removed our application: %s; stopping client".format(message))
-        markDisconnected()
+        markDead("Master removed our application: %s".format(message))
         context.stop(self)
 
       case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
@@ -158,7 +156,7 @@ private[spark] class AppClient(
         logWarning(s"Could not connect to $address: $cause")
 
       case StopAppClient =>
-        markDead()
+        markDead("Application has been stopped.")
         sender ! true
         context.stop(self)
     }
@@ -173,9 +171,9 @@ private[spark] class AppClient(
       }
     }
 
-    def markDead() {
+    def markDead(reason: String) {
       if (!alreadyDead) {
-        listener.dead()
+        listener.dead(reason)
         alreadyDead = true
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
index 1f20aa3dfa39bfe747aca9103ddc7c59a646edb5..e584952a9ad85a969971891c7d6eb6062a63b2aa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
@@ -30,8 +30,8 @@ private[spark] trait AppClientListener {
   /** Disconnection may be a temporary state, as we fail over to a new Master. */
   def disconnected(): Unit
 
-  /** Dead means that we couldn't find any Masters to connect to, and have given up. */
-  def dead(): Unit
+  /** An application death is an unrecoverable failure condition. */
+  def dead(reason: String): Unit
 
   def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 888dd45e93c6a73a369a6fa6e406e1e440eaad11..e15a87bd38fda7f6391e03c5f1a60ad9cd207ff9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -33,8 +33,8 @@ private[spark] object TestClient {
       System.exit(0)
     }
 
-    def dead() {
-      logInfo("Could not connect to master")
+    def dead(reason: String) {
+      logInfo("Application died with error: " + reason)
       System.exit(0)
     }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9544ca05dca700180da20d42ae9bd6f10090c3a8..cefa41729964a4c640d5f4e79fd147c43e205b0f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -83,10 +83,10 @@ private[spark] class SparkDeploySchedulerBackend(
     }
   }
 
-  override def dead() {
+  override def dead(reason: String) {
     if (!stopping) {
-      logError("Spark cluster looks dead, giving up.")
-      scheduler.error("Spark cluster looks down")
+      logError("Application has been killed. Reason: " + reason)
+      scheduler.error(reason)
     }
   }