diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 62b5c3bc5f0f3a93461c030b6e3c034654d54c68..46a01f5a9a2cc0fe53918185d52c6f87dc16e483 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -267,12 +267,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       // TODO: This is a bit ugly. Can we make it nicer?
       // TODO: Handle container failure
 
-      // Exists the loop if the user thread exits.
-      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
-        if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-          finishApplicationMaster(FinalApplicationStatus.FAILED,
-            "max number of executor failures reached")
-        }
+      // Exits the loop if the user thread exits.
+      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
+          && !isFinished) {
+        checkNumExecutorsFailed()
         yarnAllocator.allocateContainers(
           math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
         Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
@@ -303,11 +301,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
     val t = new Thread {
       override def run() {
-        while (userThread.isAlive) {
-          if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-            finishApplicationMaster(FinalApplicationStatus.FAILED,
-              "max number of executor failures reached")
-          }
+        while (userThread.isAlive && !isFinished) {
+          checkNumExecutorsFailed()
           val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
           if (missingExecutorCount > 0) {
             logInfo("Allocating %d containers to make up for (potentially) lost containers".
@@ -327,6 +322,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     t
   }
 
+  private def checkNumExecutorsFailed() {
+    if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+      logInfo("max number of executor failures reached")
+      finishApplicationMaster(FinalApplicationStatus.FAILED,
+        "max number of executor failures reached")
+      // make sure to stop the user thread
+      val sparkContext = ApplicationMaster.sparkContextRef.get()
+      if (sparkContext != null) {
+        logInfo("Invoking sc stop from checkNumExecutorsFailed")
+        sparkContext.stop()
+      } else {
+        logError("sparkContext is null when should shutdown")
+      }
+    }
+  }
+
   private def sendProgress() {
     logDebug("Sending progress")
     // Simulated with an allocate request with no nodes requested ...
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 184e2ad6c82cd959b512e70c087d7ad32064fa86..72c7143edcd71c37c1342eb61ed3b0674cdcef9f 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -249,7 +249,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     // Wait until all containers have finished
     // TODO: This is a bit ugly. Can we make it nicer?
     // TODO: Handle container failure
-    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
+        !isFinished) {
       yarnAllocator.allocateContainers(
         math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
       checkNumExecutorsFailed()
@@ -271,7 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
 
     val t = new Thread {
       override def run() {
-        while (!driverClosed) {
+        while (!driverClosed && !isFinished) {
           checkNumExecutorsFailed()
           val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
           if (missingExecutorCount > 0) {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 035356d390c80b909920b4de20fdb15c15c4f639..9c2bcf17a8508d15325c2e113c25b5bf7949cbbd 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -247,13 +247,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       yarnAllocator.allocateResources()
       // Exits the loop if the user thread exits.
 
-      var iters = 0
-      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
+      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
+          && !isFinished) {
         checkNumExecutorsFailed()
         allocateMissingExecutor()
         yarnAllocator.allocateResources()
         Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
-        iters += 1
       }
     }
     logInfo("All executors have launched.")
@@ -271,8 +270,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
   private def checkNumExecutorsFailed() {
     if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+      logInfo("max number of executor failures reached")
       finishApplicationMaster(FinalApplicationStatus.FAILED,
         "max number of executor failures reached")
+      // make sure to stop the user thread
+      val sparkContext = ApplicationMaster.sparkContextRef.get()
+      if (sparkContext != null) {
+        logInfo("Invoking sc stop from checkNumExecutorsFailed")
+        sparkContext.stop()
+      } else {
+        logError("sparkContext is null when should shutdown")
+      }
     }
   }
 
@@ -289,7 +297,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
     val t = new Thread {
       override def run() {
-        while (userThread.isAlive) {
+        while (userThread.isAlive && !isFinished) {
           checkNumExecutorsFailed()
           allocateMissingExecutor()
           logDebug("Sending progress")
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index fc7b8320d734d77272a12f7b0efb5db417baf86a..a7585748b7f88f951bc5f390f846728f0f5561e4 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -217,7 +217,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     // Wait until all containers have launched
     yarnAllocator.addResourceRequests(args.numExecutors)
     yarnAllocator.allocateResources()
-    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+    while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
+        !isFinished) {
       checkNumExecutorsFailed()
       allocateMissingExecutor()
       yarnAllocator.allocateResources()
@@ -249,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
 
     val t = new Thread {
       override def run() {
-        while (!driverClosed) {
+        while (!driverClosed && !isFinished) {
           checkNumExecutorsFailed()
           allocateMissingExecutor()
           logDebug("Sending progress")