diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index f19648ec6865ff78c8c80d527f19616f6c956fef..6a0617cc063cd2615becf3dbc885fa8c0badcc07 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
 
   private var yarnAllocator: YarnAllocationHandler = null
+  private var isFinished:Boolean = false
 
   def run() {
     
@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     
     // Wait for the user class to Finish     
     userThread.join()
-     
-    // Finish the ApplicationMaster
-    finishApplicationMaster()
-    // TODO: Exit based on success/failure
+
     System.exit(0)
   }
   
@@ -124,17 +122,30 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       }
     }
   }
-  
+
   private def startUserClass(): Thread  = {
     logInfo("Starting the user JAR in a separate Thread")
     val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
       .getMethod("main", classOf[Array[String]])
     val t = new Thread {
       override def run() {
-        // Copy
-        var mainArgs: Array[String] = new Array[String](args.userArgs.size())
-        args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
-        mainMethod.invoke(null, mainArgs)
+        var successed = false
+        try {
+          // Copy
+          var mainArgs: Array[String] = new Array[String](args.userArgs.size())
+          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
+          mainMethod.invoke(null, mainArgs)
+          // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
+          // userThread will stop here unless it has uncaught exception thrown out
+          // It need shutdown hook to set SUCCEEDED
+          successed = true
+        } finally {
+          if (successed) {
+            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+          } else {
+            ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+          }
+        }
       }
     }
     t.start()
@@ -179,7 +190,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     logInfo("All workers have launched.")
 
     // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
-    if (userThread.isAlive){
+    if (userThread.isAlive) {
       // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
 
       val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
@@ -197,7 +208,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
     val t = new Thread {
       override def run() {
-        while (userThread.isAlive){
+        while (userThread.isAlive) {
           val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
           if (missingWorkerCount > 0) {
             logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
@@ -235,14 +246,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     }
   }
   */
-  
-  def finishApplicationMaster() { 
+
+  def finishApplicationMaster(status: FinalApplicationStatus) {
+
+    synchronized {
+      if (isFinished) {
+        return
+      }
+      isFinished = true
+    }
+
+    logInfo("finishApplicationMaster with " + status)
     val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
       .asInstanceOf[FinishApplicationMasterRequest]
     finishReq.setAppAttemptId(appAttemptId)
-    // TODO: Check if the application has failed or succeeded
-    finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED)
+    finishReq.setFinishApplicationStatus(status)
     resourceManager.finishApplicationMaster(finishReq)
+
   }
  
 }
@@ -256,7 +276,7 @@ object ApplicationMaster {
   private val ALLOCATOR_LOOP_WAIT_COUNT = 30
   def incrementAllocatorLoop(by: Int) {
     val count = yarnAllocatorLoop.getAndAdd(by)
-    if (count >= ALLOCATOR_LOOP_WAIT_COUNT){
+    if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
       yarnAllocatorLoop.synchronized {
         // to wake threads off wait ...
         yarnAllocatorLoop.notifyAll()
@@ -291,14 +311,16 @@ object ApplicationMaster {
           logInfo("Invoking sc stop from shutdown hook") 
           sc.stop() 
           // best case ...
-          for (master <- applicationMasters) master.finishApplicationMaster
+          for (master <- applicationMasters) {
+            master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+          }
         } 
       } )
     }
 
     // Wait for initialization to complete and atleast 'some' nodes can get allocated
     yarnAllocatorLoop.synchronized {
-      while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT){
+      while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
         yarnAllocatorLoop.wait(1000L)
       }
     }