diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2e46d750c4a3801f5d4cc381dcd2f8dd759561a3..560e5de358908c13c9dbbff3ffaa95c6c7138e5d 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -65,6 +65,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
   private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3))
 
+  private var registered = false
+
   def run() {
     // Setup the directories so things go to yarn approved directories rather
     // then user specified and /tmp.
@@ -110,7 +112,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     waitForSparkContextInitialized()
 
     // Do this after spark master is up and SparkContext is created so that we can register UI Url
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+    synchronized {
+      if (!isFinished) {
+        registerApplicationMaster()
+        registered = true
+      }
+    }
 
     // Allocate all containers
     allocateWorkers()
@@ -208,7 +215,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         var count = 0
         val waitTime = 10000L
         val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
-        while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
+        while (ApplicationMaster.sparkContextRef.get() == null && count < numTries
+            && !isFinished) {
           logInfo("Waiting for spark context initialization ... " + count)
           count = count + 1
           ApplicationMaster.sparkContextRef.wait(waitTime)
@@ -341,17 +349,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         return
       }
       isFinished = true
+      
+      logInfo("finishApplicationMaster with " + status)
+      if (registered) {
+        val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+          .asInstanceOf[FinishApplicationMasterRequest]
+        finishReq.setAppAttemptId(appAttemptId)
+        finishReq.setFinishApplicationStatus(status)
+        finishReq.setDiagnostics(diagnostics)
+        // Set tracking url to empty since we don't have a history server.
+        finishReq.setTrackingUrl("")
+        resourceManager.finishApplicationMaster(finishReq)
+      }
     }
-
-    logInfo("finishApplicationMaster with " + status)
-    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-      .asInstanceOf[FinishApplicationMasterRequest]
-    finishReq.setAppAttemptId(appAttemptId)
-    finishReq.setFinishApplicationStatus(status)
-    finishReq.setDiagnostics(diagnostics)
-    // Set tracking url to empty since we don't have a history server.
-    finishReq.setTrackingUrl("")
-    resourceManager.finishApplicationMaster(finishReq)
   }
 
   /**
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4b777d5fa7a283e78744923dce484b13b1cc1431..0f58c49c695835ba4c76d5932d8a60b82f434b10 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -67,6 +67,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
   private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3))
 
+  private var registered = false
+
   def run() {
     // Setup the directories so things go to YARN approved directories rather
     // than user specified and /tmp.
@@ -99,7 +101,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     waitForSparkContextInitialized()
 
     // Do this after Spark master is up and SparkContext is created so that we can register UI Url.
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+    synchronized {
+      if (!isFinished) {
+        registerApplicationMaster()
+        registered = true
+      }
+    }
 
     // Allocate all containers
     allocateWorkers()
@@ -180,7 +187,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         var numTries = 0
         val waitTime = 10000L
         val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
-        while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
+        while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries
+            && !isFinished) {
           logInfo("Waiting for Spark context initialization ... " + numTries)
           numTries = numTries + 1
           ApplicationMaster.sparkContextRef.wait(waitTime)
@@ -313,11 +321,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         return
       }
       isFinished = true
-    }
 
-    logInfo("finishApplicationMaster with " + status)
-    // Set tracking URL to empty since we don't have a history server.
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
+      logInfo("finishApplicationMaster with " + status)
+      if (registered) {
+        // Set tracking URL to empty since we don't have a history server.
+        amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
+      }
+    }
   }
 
   /**