diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 53a589615d2a4ea12c8ed37192ff0d0fa8577d34..c1e65a3c488de624bcd8f3b9959ef5a58489882a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None - var taskFailed = false synchronized { try { if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { @@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } taskIdToExecutorId.remove(tid) } - if (state == TaskState.FAILED) { - taskFailed = true - } activeTaskSets.get(taskSetId).foreach { taskSet => if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) @@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } - if (taskFailed) { - // Also revive offers if a task had failed for some reason other than host lost - backend.reviveOffers() - } } def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) { @@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) taskState: TaskState, reason: Option[TaskEndReason]) = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) - if (taskState == TaskState.FINISHED) { - // The task finished successfully but the result was lost, so we should revive offers. + if (taskState != TaskState.KILLED) { + // Need to revive offers again now that the task set manager state has been updated to + // reflect failed tasks that need to be re-run. backend.reviveOffers() } }