Skip to content
Snippets Groups Projects
Commit 92e385e0 authored by liujianhui's avatar liujianhui Committed by Kay Ousterhout
Browse files

[SPARK-19868] conflict TasksetManager lead to spark stopped

## What changes were proposed in this pull request?

We must set the taskset to zombie before the DAGScheduler handles the taskEnded event. It's possible the taskEnded event will cause the DAGScheduler to launch a new stage attempt (this happens when map output data was lost), and if this happens before the taskSet has been set to zombie, it will appear that we have conflicting task sets.

Author: liujianhui <liujianhui@didichuxing>

Closes #17208 from liujianhuiouc/spark-19868.
parent d4fac410
No related branches found
No related tags found
No related merge requests found
......@@ -713,13 +713,7 @@ private[spark] class TaskSetManager(
successfulTaskDurations.insert(info.duration)
}
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
// Kill any other attempts for the same task (since those are unnecessary now that one
// attempt completed successfully).
for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
......@@ -746,6 +740,13 @@ private[spark] class TaskSetManager(
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
maybeFinishTaskSet()
}
......
......@@ -22,8 +22,10 @@ import java.util.{Properties, Random}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.mockito.Matchers.{anyInt, anyString}
import org.mockito.Matchers.{any, anyInt, anyString}
import org.mockito.Mockito.{mock, never, spy, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.apache.spark._
import org.apache.spark.internal.config
......@@ -1056,6 +1058,29 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.isZombie)
}
test("SPARK-19868: DagScheduler only notified of taskEnd when state is ready") {
// dagScheduler.taskEnded() is async, so it may *seem* ok to call it before we've set all
// appropriate state, eg. isZombie. However, this sets up a race that could go the wrong way.
// This is a super-focused regression test which checks the zombie state as soon as
// dagScheduler.taskEnded() is called, to ensure we haven't introduced a race.
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val mockDAGScheduler = mock(classOf[DAGScheduler])
sched.dagScheduler = mockDAGScheduler
val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1))
when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
assert(manager.isZombie === true)
}
})
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
// this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon
manager.handleSuccessfulTask(0, createTaskResult(0))
}
test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment