diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a177aab5f95dec7541f9fbf0674af44e87a28afb..a41b059fa7decff7ed278c64d383c999852d20e6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -713,13 +713,7 @@ private[spark] class TaskSetManager(
       successfulTaskDurations.insert(info.duration)
     }
     removeRunningTask(tid)
-    // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
-    // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
-    // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
-    // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
-    // Note: "result.value()" only deserializes the value when it's called at the first time, so
-    // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
+
     // Kill any other attempts for the same task (since those are unnecessary now that one
     // attempt completed successfully).
     for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
@@ -746,6 +740,13 @@ private[spark] class TaskSetManager(
       logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
         " because task " + index + " has already completed successfully")
     }
+    // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
+    // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
+    // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
+    // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
+    // Note: "result.value()" only deserializes the value when it's called at the first time, so
+    // here "result.value()" just returns the value and won't block other threads.
+    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
     maybeFinishTaskSet()
   }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 132caef0978fbad2edc6ac94b93fc53d1270413b..9ca6b8b0fe63542a8979f2c45bce1339886a33bf 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -22,8 +22,10 @@ import java.util.{Properties, Random}
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import org.mockito.Matchers.{anyInt, anyString}
+import org.mockito.Matchers.{any, anyInt, anyString}
 import org.mockito.Mockito.{mock, never, spy, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
 
 import org.apache.spark._
 import org.apache.spark.internal.config
@@ -1056,6 +1058,29 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     assert(manager.isZombie)
   }
 
+
+  test("SPARK-19868: DagScheduler only notified of taskEnd when state is ready") {
+    // dagScheduler.taskEnded() is async, so it may *seem* ok to call it before we've set all
+    // appropriate state, eg. isZombie.   However, this sets up a race that could go the wrong way.
+    // This is a super-focused regression test which checks the zombie state as soon as
+    // dagScheduler.taskEnded() is called, to ensure we haven't introduced a race.
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val mockDAGScheduler = mock(classOf[DAGScheduler])
+    sched.dagScheduler = mockDAGScheduler
+    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1))
+    when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit] {
+      override def answer(invocationOnMock: InvocationOnMock): Unit = {
+        assert(manager.isZombie === true)
+      }
+    })
+    val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
+    assert(taskOption.isDefined)
+    // this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon
+    manager.handleSuccessfulTask(0, createTaskResult(0))
+  }
+
   test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") {
     sc = new SparkContext("local", "test")
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))