diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index d3674427b12717a02f011379087df2e7d0604bc8..c3ca43f8d07343ef20474777d08102f758cd14bf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -96,11 +96,13 @@ private[spark] class ApplicationInfo(
 
   def retryCount = _retryCount
 
-  def incrementRetryCount = {
+  def incrementRetryCount() = {
     _retryCount += 1
     _retryCount
   }
 
+  def resetRetryCount() = _retryCount = 0
+
   def markFinished(endState: ApplicationState.Value) {
     state = endState
     endTime = System.currentTimeMillis()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2a66fcfe4801c634fd8f42e904d25f9852632e6b..a3909d6ea95c0ea42e3e5f7e76ba8f39e7a6fc37 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -296,28 +296,34 @@ private[spark] class Master(
       val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
       execOption match {
         case Some(exec) => {
+          val appInfo = idToApp(appId)
           exec.state = state
+          if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
           exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
           if (ExecutorState.isFinished(state)) {
-            val appInfo = idToApp(appId)
             // Remove this executor from the worker and app
-            logInfo("Removing executor " + exec.fullId + " because it is " + state)
+            logInfo(s"Removing executor ${exec.fullId} because it is $state")
             appInfo.removeExecutor(exec)
             exec.worker.removeExecutor(exec)
 
-            val normalExit = exitStatus.exists(_ == 0)
+            val normalExit = exitStatus == Some(0)
             // Only retry certain number of times so we don't go into an infinite loop.
-            if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
-              schedule()
-            } else if (!normalExit) {
-              logError("Application %s with ID %s failed %d times, removing it".format(
-                appInfo.desc.name, appInfo.id, appInfo.retryCount))
-              removeApplication(appInfo, ApplicationState.FAILED)
+            if (!normalExit) {
+              if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
+                schedule()
+              } else {
+                val execs = appInfo.executors.values
+                if (!execs.exists(_.state == ExecutorState.RUNNING)) {
+                  logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
+                    s"${appInfo.retryCount} times; removing it")
+                  removeApplication(appInfo, ApplicationState.FAILED)
+                }
+              }
             }
           }
         }
         case None =>
-          logWarning("Got status update for unknown executor " + appId + "/" + execId)
+          logWarning(s"Got status update for unknown executor $appId/$execId")
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 7be89f9aff0f30db2854e80fc9232a9211811a17..00a43673e5cd3e545d5d2c4bb9375bea28490418 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -159,6 +159,8 @@ private[spark] class ExecutorRunner(
       Files.write(header, stderr, Charsets.UTF_8)
       stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
 
+      state = ExecutorState.RUNNING
+      worker ! ExecutorStateChanged(appId, execId, state, None, None)
       // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
       // or with nonzero exit code
       val exitCode = process.waitFor()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index e475567db6a20eb09036edf26233153a57254e66..0c454e4138c968c2d30d09e3f1c30d8c6d5ab01f 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -234,7 +234,7 @@ private[spark] class Worker(
         try {
           logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-            self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
+            self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_