diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5c7bca45412221de2bd5fe2b36d02dd231360ff5..9c66c785848a5468ee6eefe1ec6dfa1f9ed3b847 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -137,15 +137,7 @@ object Client {
     System.setProperty("SPARK_YARN_MODE", "true")
     val sparkConf = new SparkConf
 
-    try {
-      val args = new ClientArguments(argStrings, sparkConf)
-      new Client(args, sparkConf).run()
-    } catch {
-      case e: Exception =>
-        Console.err.println(e.getMessage)
-        System.exit(1)
-    }
-
-    System.exit(0)
+    val args = new ClientArguments(argStrings, sparkConf)
+    new Client(args, sparkConf).run()
   }
 }
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 0efac4ea6370294b83bd56afee47f94c7dd49c46..fb0e34bf5985ecea71229766eb5d666ffc15a7aa 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -417,17 +417,19 @@ private[spark] trait ClientBase extends Logging {
 
   /**
    * Report the state of an application until it has exited, either successfully or
-   * due to some failure, then return the application state.
+   * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
+   * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
+   * or KILLED).
    *
    * @param appId ID of the application to monitor.
    * @param returnOnRunning Whether to also return the application state when it is RUNNING.
    * @param logApplicationReport Whether to log details of the application report every iteration.
-   * @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING.
+   * @return A pair of the yarn application state and the final application state.
    */
   def monitorApplication(
       appId: ApplicationId,
       returnOnRunning: Boolean = false,
-      logApplicationReport: Boolean = true): YarnApplicationState = {
+      logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
     val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
     var lastState: YarnApplicationState = null
     while (true) {
@@ -468,11 +470,11 @@ private[spark] trait ClientBase extends Logging {
       if (state == YarnApplicationState.FINISHED ||
         state == YarnApplicationState.FAILED ||
         state == YarnApplicationState.KILLED) {
-        return state
+        return (state, report.getFinalApplicationStatus)
       }
 
       if (returnOnRunning && state == YarnApplicationState.RUNNING) {
-        return state
+        return (state, report.getFinalApplicationStatus)
       }
 
       lastState = state
@@ -485,8 +487,23 @@ private[spark] trait ClientBase extends Logging {
   /**
    * Submit an application to the ResourceManager and monitor its state.
    * This continues until the application has exited for any reason.
+   * If the application finishes with a failed, killed, or undefined status,
+   * throw an appropriate SparkException.
    */
-  def run(): Unit = monitorApplication(submitApplication())
+  def run(): Unit = {
+    val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
+    if (yarnApplicationState == YarnApplicationState.FAILED ||
+      finalApplicationStatus == FinalApplicationStatus.FAILED) {
+      throw new SparkException("Application finished with failed status")
+    }
+    if (yarnApplicationState == YarnApplicationState.KILLED ||
+      finalApplicationStatus == FinalApplicationStatus.KILLED) {
+      throw new SparkException("Application is killed")
+    }
+    if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+      throw new SparkException("The final status of application is undefined")
+    }
+  }
 
   /* --------------------------------------------------------------------------------------- *
    |  Methods that cannot be implemented here due to API differences across hadoop versions  |
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 6bb4b82316ad49190936fe819fb2da5169f7268b..d948a2aeedd457ffd6afaa913bb399daa3c8ab54 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -99,7 +99,7 @@ private[spark] class YarnClientSchedulerBackend(
    */
   private def waitForApplication(): Unit = {
     assert(client != null && appId != null, "Application has not been submitted yet!")
-    val state = client.monitorApplication(appId, returnOnRunning = true) // blocking
+    val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking
     if (state == YarnApplicationState.FINISHED ||
       state == YarnApplicationState.FAILED ||
       state == YarnApplicationState.KILLED) {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0b43e6ee205384b0ef610311f6ac900d5e49173f..addaddb711d3ca222e75dad98008321c2af9a9e3 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -135,15 +135,7 @@ object Client {
     System.setProperty("SPARK_YARN_MODE", "true")
     val sparkConf = new SparkConf
 
-    try {
-      val args = new ClientArguments(argStrings, sparkConf)
-      new Client(args, sparkConf).run()
-    } catch {
-      case e: Exception =>
-        Console.err.println(e.getMessage)
-        System.exit(1)
-    }
-
-    System.exit(0)
+    val args = new ClientArguments(argStrings, sparkConf)
+    new Client(args, sparkConf).run()
   }
 }
diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index a826b2a78a8f5a30d37dd09267e3e3d16df76dde..d79b85e867fcd5b02423944008af0e9b3a5bbde6 100644
--- a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.server.MiniYARNCluster
 
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
@@ -123,21 +123,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
     val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
     var result = File.createTempFile("result", null, tempDir)
 
-    // The Client object will call System.exit() after the job is done, and we don't want
-    // that because it messes up the scalatest monitoring. So replicate some of what main()
-    // does here.
     val args = Array("--class", main,
       "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
       "--arg", "yarn-cluster",
       "--arg", result.getAbsolutePath(),
       "--num-executors", "1")
-    val sparkConf = new SparkConf()
-    val yarnConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
-    val clientArgs = new ClientArguments(args, sparkConf)
-    new Client(clientArgs, yarnConf, sparkConf).run()
+    Client.main(args)
     checkResult(result)
   }
 
+  test("run Spark in yarn-cluster mode unsuccessfully") {
+    val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
+
+    // Use only one argument so the driver will fail
+    val args = Array("--class", main,
+      "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
+      "--arg", "yarn-cluster",
+      "--num-executors", "1")
+    val exception = intercept[SparkException] {
+      Client.main(args)
+    }
+    assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
+  }
+
   /**
    * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
    * any sort of error when the job process finishes successfully, but the job itself fails. So