From b4546ba9e694529c359b7ca5c26829ead2c07f1a Mon Sep 17 00:00:00 2001
From: Kay Ousterhout <kayousterhout@gmail.com>
Date: Thu, 14 Nov 2013 13:33:11 -0800
Subject: [PATCH] Fix bug where scheduler could hang after task failure.

When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
---
 .../spark/scheduler/cluster/ClusterScheduler.scala  | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 53a589615d..c1e65a3c48 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
     var failedExecutor: Option[String] = None
-    var taskFailed = false
     synchronized {
       try {
         if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
@@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
               }
               taskIdToExecutorId.remove(tid)
             }
-            if (state == TaskState.FAILED) {
-              taskFailed = true
-            }
             activeTaskSets.get(taskSetId).foreach { taskSet =>
               if (state == TaskState.FINISHED) {
                 taskSet.removeRunningTask(tid)
@@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       dagScheduler.executorLost(failedExecutor.get)
       backend.reviveOffers()
     }
-    if (taskFailed) {
-      // Also revive offers if a task had failed for some reason other than host lost
-      backend.reviveOffers()
-    }
   }
 
   def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) {
@@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     taskState: TaskState,
     reason: Option[TaskEndReason]) = synchronized {
     taskSetManager.handleFailedTask(tid, taskState, reason)
-    if (taskState == TaskState.FINISHED) {
-      // The task finished successfully but the result was lost, so we should revive offers.
+    if (taskState != TaskState.KILLED) {
+      // Need to revive offers again now that the task set manager state has been updated to
+      // reflect failed tasks that need to be re-run.
       backend.reviveOffers()
     }
   }
-- 
GitLab