diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e9d2f5757963d46a97764e400c04b9b14ff18c2e..eb14d87467af7fd04b9a33475c5094e83276dc85 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -300,10 +300,17 @@ class SparkContext(config: SparkConf) extends Logging {
 
   // Create and start the scheduler
   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
-  taskScheduler.start()
+  @volatile private[spark] var dagScheduler: DAGScheduler = _
+  try {
+    dagScheduler = new DAGScheduler(this)
+  } catch {
+    case e: Exception => throw
+      new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
+  }
 
-  @volatile private[spark] var dagScheduler = new DAGScheduler(this)
-  dagScheduler.start()
+  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
+  // constructor
+  taskScheduler.start()
 
   private[spark] val cleaner: Option[ContextCleaner] = {
     if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
@@ -1022,8 +1029,8 @@ class SparkContext(config: SparkConf) extends Logging {
       partitions: Seq[Int],
       allowLocal: Boolean,
       resultHandler: (Int, U) => Unit) {
-    partitions.foreach{ p =>
-      require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
+    if (dagScheduler == null) {
+      throw new SparkException("SparkContext has been shutdown")
     }
     val callSite = getCallSite
     val cleanedFunc = clean(func)
@@ -1132,9 +1139,6 @@ class SparkContext(config: SparkConf) extends Logging {
       resultHandler: (Int, U) => Unit,
       resultFunc: => R): SimpleFutureAction[R] =
   {
-    partitions.foreach{ p =>
-      require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
-    }
     val cleanF = clean(processPartition)
     val callSite = getCallSite
     val waiter = dagScheduler.submitJob(
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e8bbfbf01679cca43fbab26928a5e4897e70df51..3b3524f33e8118b7d409ee02a134bda0ade9500a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1142,9 +1142,9 @@ abstract class RDD[T: ClassTag](
   @transient private var doCheckpointCalled = false
 
   /**
-   * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
-   * after a job using this RDD has completed (therefore the RDD has been materialized and
-   * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
+   * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
+   * has completed (therefore the RDD has been materialized and potentially stored in memory).
+   * doCheckpoint() is called recursively on the parent RDDs.
    */
   private[spark] def doCheckpoint() {
     if (!doCheckpointCalled) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dbde9b591dccc32a77146fab47048eefd552824c..ff411e24a3d856b97d7a12da5f01ce3d616d0397 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -22,10 +22,16 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+import scala.concurrent.Await
 import scala.concurrent.duration._
+import scala.language.postfixOps
 import scala.reflect.ClassTag
 
 import akka.actor._
+import akka.actor.OneForOneStrategy
+import akka.actor.SupervisorStrategy.Stop
+import akka.pattern.ask
+import akka.util.Timeout
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
@@ -47,14 +53,11 @@ import org.apache.spark.util.Utils
  * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
  * a small number of times before cancelling the whole stage.
  *
- * THREADING: This class runs all its logic in a single thread executing the run() method, to which
- * events are submitted using a synchronized queue (eventQueue). The public API methods, such as
- * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods
- * should be private.
  */
 private[spark]
 class DAGScheduler(
-    taskScheduler: TaskScheduler,
+    private[scheduler] val sc: SparkContext,
+    private[scheduler] val taskScheduler: TaskScheduler,
     listenerBus: LiveListenerBus,
     mapOutputTracker: MapOutputTrackerMaster,
     blockManagerMaster: BlockManagerMaster,
@@ -65,6 +68,7 @@ class DAGScheduler(
 
   def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
     this(
+      sc,
       taskScheduler,
       sc.listenerBus,
       sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
@@ -74,8 +78,6 @@ class DAGScheduler(
 
   def this(sc: SparkContext) = this(sc, sc.taskScheduler)
 
-  private var eventProcessActor: ActorRef = _
-
   private[scheduler] val nextJobId = new AtomicInteger(0)
   private[scheduler] def numTotalJobs: Int = nextJobId.get()
   private val nextStageId = new AtomicInteger(0)
@@ -113,50 +115,31 @@ class DAGScheduler(
   //       stray messages to detect.
   private val failedEpoch = new HashMap[String, Long]
 
-  taskScheduler.setDAGScheduler(this)
+  private val dagSchedulerActorSupervisor =
+    env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
 
-  /**
-   * Starts the event processing actor.  The actor has two responsibilities:
-   *
-   * 1. Waits for events like job submission, task finished, task failure etc., and calls
-   *    [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
-   * 2. Schedules a periodical task to resubmit failed stages.
-   *
-   * NOTE: the actor cannot be started in the constructor, because the periodical task references
-   * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus
-   * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
-   */
-  def start() {
-    eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
-      /**
-       * The main event loop of the DAG scheduler.
-       */
-      def receive = {
-        case event: DAGSchedulerEvent =>
-          logTrace("Got event of type " + event.getClass.getName)
-
-          /**
-           * All events are forwarded to `processEvent()`, so that the event processing logic can
-           * easily tested without starting a dedicated actor.  Please refer to `DAGSchedulerSuite`
-           * for details.
-           */
-          if (!processEvent(event)) {
-            submitWaitingStages()
-          } else {
-            context.stop(self)
-          }
-      }
-    }))
+  private[scheduler] var eventProcessActor: ActorRef = _
+
+  private def initializeEventProcessActor() {
+    // blocking the thread until supervisor is started, which ensures eventProcessActor is
+    // not null before any job is submitted
+    implicit val timeout = Timeout(30 seconds)
+    val initEventActorReply =
+      dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
+    eventProcessActor = Await.result(initEventActorReply, timeout.duration).
+      asInstanceOf[ActorRef]
   }
 
+  initializeEventProcessActor()
+
   // Called by TaskScheduler to report task's starting.
   def taskStarted(task: Task[_], taskInfo: TaskInfo) {
     eventProcessActor ! BeginEvent(task, taskInfo)
   }
 
   // Called to report that a task has completed and results are being fetched remotely.
-  def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
-    eventProcessActor ! GettingResultEvent(task, taskInfo)
+  def taskGettingResult(taskInfo: TaskInfo) {
+    eventProcessActor ! GettingResultEvent(taskInfo)
   }
 
   // Called by TaskScheduler to report task completions or failures.
@@ -436,7 +419,7 @@ class DAGScheduler(
   {
     // Check to make sure we are not launching a task on a partition that does not exist.
     val maxPartitions = rdd.partitions.length
-    partitions.find(p => p >= maxPartitions).foreach { p =>
+    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
       throw new IllegalArgumentException(
         "Attempting to access a non-existent partition: " + p + ". " +
           "Total number of partitions: " + maxPartitions)
@@ -511,6 +494,15 @@ class DAGScheduler(
     eventProcessActor ! AllJobsCancelled
   }
 
+  private[scheduler] def doCancelAllJobs() {
+    // Cancel all running jobs.
+    runningStages.map(_.jobId).foreach(handleJobCancellation(_,
+      reason = "as part of cancellation of all jobs"))
+    activeJobs.clear() // These should already be empty by this point,
+    jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
+    submitWaitingStages()
+  }
+
   /**
    * Cancel all jobs associated with a running or scheduled stage.
    */
@@ -518,148 +510,30 @@ class DAGScheduler(
     eventProcessActor ! StageCancelled(stageId)
   }
 
-  /**
-   * Process one event retrieved from the event processing actor.
-   *
-   * @param event The event to be processed.
-   * @return `true` if we should stop the event loop.
-   */
-  private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
-    event match {
-      case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
-        var finalStage: Stage = null
-        try {
-          // New stage creation may throw an exception if, for example, jobs are run on a HadoopRDD
-          // whose underlying HDFS files have been deleted.
-          finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
-        } catch {
-          case e: Exception =>
-            logWarning("Creating new stage failed due to exception - job: " + jobId, e)
-            listener.jobFailed(e)
-            return false
-        }
-        val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
-        clearCacheLocs()
-        logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +
-                " output partitions (allowLocal=" + allowLocal + ")")
-        logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
-        logInfo("Parents of final stage: " + finalStage.parents)
-        logInfo("Missing parents: " + getMissingParentStages(finalStage))
-        if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
-          // Compute very short actions like first() or take() with no parent stages locally.
-          listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
-          runLocally(job)
-        } else {
-          jobIdToActiveJob(jobId) = job
-          activeJobs += job
-          resultStageToJob(finalStage) = job
-          listenerBus.post(
-            SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
-          submitStage(finalStage)
-        }
-
-      case StageCancelled(stageId) =>
-        handleStageCancellation(stageId)
-
-      case JobCancelled(jobId) =>
-        handleJobCancellation(jobId)
-
-      case JobGroupCancelled(groupId) =>
-        // Cancel all jobs belonging to this job group.
-        // First finds all active jobs with this group id, and then kill stages for them.
-        val activeInGroup = activeJobs.filter(activeJob =>
-          groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
-        val jobIds = activeInGroup.map(_.jobId)
-        jobIds.foreach(jobId => handleJobCancellation(jobId,
-          "as part of cancelled job group %s".format(groupId)))
-
-      case AllJobsCancelled =>
-        // Cancel all running jobs.
-        runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId,
-          "as part of cancellation of all jobs"))
-        activeJobs.clear()      // These should already be empty by this point,
-        jobIdToActiveJob.clear()   // but just in case we lost track of some jobs...
-
-      case ExecutorAdded(execId, host) =>
-        handleExecutorAdded(execId, host)
-
-      case ExecutorLost(execId) =>
-        handleExecutorLost(execId)
-
-      case BeginEvent(task, taskInfo) =>
-        for (
-          stage <- stageIdToStage.get(task.stageId);
-          stageInfo <- stageToInfos.get(stage)
-        ) {
-          if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 &&
-              !stageInfo.emittedTaskSizeWarning) {
-            stageInfo.emittedTaskSizeWarning = true
-            logWarning(("Stage %d (%s) contains a task of very large " +
-              "size (%d KB). The maximum recommended task size is %d KB.").format(
-              task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN))
-          }
-        }
-        listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
-
-      case GettingResultEvent(task, taskInfo) =>
-        listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
-
-      case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
-        val stageId = task.stageId
-        val taskType = Utils.getFormattedClassName(task)
-        listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, taskMetrics))
-        handleTaskCompletion(completion)
-
-      case TaskSetFailed(taskSet, reason) =>
-        stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
-
-      case ResubmitFailedStages =>
-        if (failedStages.size > 0) {
-          // Failed stages may be removed by job cancellation, so failed might be empty even if
-          // the ResubmitFailedStages event has been scheduled.
-          resubmitFailedStages()
-        }
-
-      case StopDAGScheduler =>
-        // Cancel any active jobs
-        for (job <- activeJobs) {
-          val error = new SparkException("Job cancelled because SparkContext was shut down")
-          job.listener.jobFailed(error)
-          // Tell the listeners that all of the running stages have ended.  Don't bother
-          // cancelling the stages because if the DAG scheduler is stopped, the entire application
-          // is in the process of getting stopped.
-          val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
-          runningStages.foreach { stage =>
-            val info = stageToInfos(stage)
-            info.stageFailed(stageFailedMessage)
-            listenerBus.post(SparkListenerStageCompleted(info))
-          }
-          listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
-        }
-        return true
-    }
-    false
-  }
-
   /**
    * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since
    * the last fetch failure.
    */
   private[scheduler] def resubmitFailedStages() {
-    logInfo("Resubmitting failed stages")
-    clearCacheLocs()
-    val failedStagesCopy = failedStages.toArray
-    failedStages.clear()
-    for (stage <- failedStagesCopy.sortBy(_.jobId)) {
-      submitStage(stage)
+    if (failedStages.size > 0) {
+      // Failed stages may be removed by job cancellation, so failed might be empty even if
+      // the ResubmitFailedStages event has been scheduled.
+      logInfo("Resubmitting failed stages")
+      clearCacheLocs()
+      val failedStagesCopy = failedStages.toArray
+      failedStages.clear()
+      for (stage <- failedStagesCopy.sortBy(_.jobId)) {
+        submitStage(stage)
+      }
     }
+    submitWaitingStages()
   }
 
   /**
    * Check for waiting or failed stages which are now eligible for resubmission.
    * Ordinarily run on every iteration of the event loop.
    */
-  private[scheduler] def submitWaitingStages() {
+  private def submitWaitingStages() {
     // TODO: We might want to run this less often, when we are sure that something has become
     // runnable that wasn't before.
     logTrace("Checking for newly runnable parent stages")
@@ -730,6 +604,102 @@ class DAGScheduler(
     }
   }
 
+  private[scheduler] def handleJobGroupCancelled(groupId: String) {
+    // Cancel all jobs belonging to this job group.
+    // First finds all active jobs with this group id, and then kill stages for them.
+    val activeInGroup = activeJobs.filter(activeJob =>
+      groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
+    val jobIds = activeInGroup.map(_.jobId)
+    jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
+    submitWaitingStages()
+  }
+
+  private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
+    for (stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage)) {
+      if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 &&
+        !stageInfo.emittedTaskSizeWarning) {
+        stageInfo.emittedTaskSizeWarning = true
+        logWarning(("Stage %d (%s) contains a task of very large " +
+          "size (%d KB). The maximum recommended task size is %d KB.").format(
+            task.stageId, stageInfo.name, taskInfo.serializedSize / 1024,
+            DAGScheduler.TASK_SIZE_TO_WARN))
+      }
+    }
+    listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
+    submitWaitingStages()
+  }
+
+  private[scheduler] def handleTaskSetFailed(taskSet: TaskSet, reason: String) {
+    stageIdToStage.get(taskSet.stageId).foreach {abortStage(_, reason) }
+    submitWaitingStages()
+  }
+
+  private[scheduler] def cleanUpAfterSchedulerStop() {
+    for (job <- activeJobs) {
+      val error = new SparkException("Job cancelled because SparkContext was shut down")
+      job.listener.jobFailed(error)
+      // Tell the listeners that all of the running stages have ended.  Don't bother
+      // cancelling the stages because if the DAG scheduler is stopped, the entire application
+      // is in the process of getting stopped.
+      val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
+      runningStages.foreach { stage =>
+        val info = stageToInfos(stage)
+        info.stageFailed(stageFailedMessage)
+        listenerBus.post(SparkListenerStageCompleted(info))
+      }
+      listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+    }
+  }
+
+  private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
+    listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
+    submitWaitingStages()
+  }
+
+  private[scheduler] def handleJobSubmitted(jobId: Int,
+      finalRDD: RDD[_],
+      func: (TaskContext, Iterator[_]) => _,
+      partitions: Array[Int],
+      allowLocal: Boolean,
+      callSite: String,
+      listener: JobListener,
+      properties: Properties = null)
+  {
+    var finalStage: Stage = null
+    try {
+      // New stage creation may throw an exception if, for example, jobs are run on a
+      // HadoopRDD whose underlying HDFS files have been deleted.
+      finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
+    } catch {
+      case e: Exception =>
+        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
+        listener.jobFailed(e)
+        return
+    }
+    if (finalStage != null) {
+      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
+      clearCacheLocs()
+      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
+        job.jobId, callSite, partitions.length, allowLocal))
+      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
+      logInfo("Parents of final stage: " + finalStage.parents)
+      logInfo("Missing parents: " + getMissingParentStages(finalStage))
+      if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
+        // Compute very short actions like first() or take() with no parent stages locally.
+        listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
+        runLocally(job)
+      } else {
+        jobIdToActiveJob(jobId) = job
+        activeJobs += job
+        resultStageToJob(finalStage) = job
+        listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
+          properties))
+        submitStage(finalStage)
+      }
+    }
+    submitWaitingStages()
+  }
+
   /** Submits stage, but first recursively submits any missing parents. */
   private def submitStage(stage: Stage) {
     val jobId = activeJobForStage(stage)
@@ -819,9 +789,12 @@ class DAGScheduler(
    * Responds to a task finishing. This is called inside the event loop so it assumes that it can
    * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
    */
-  private def handleTaskCompletion(event: CompletionEvent) {
+  private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
     val task = event.task
-
+    val stageId = task.stageId
+    val taskType = Utils.getFormattedClassName(task)
+    listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
+      event.taskMetrics))
     if (!stageIdToStage.contains(task.stageId)) {
       // Skip all the actions if the stage has been cancelled.
       return
@@ -964,6 +937,7 @@ class DAGScheduler(
         // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
         // will abort the job.
     }
+    submitWaitingStages()
   }
 
   /**
@@ -973,7 +947,7 @@ class DAGScheduler(
    * Optionally the epoch during which the failure was caught can be passed to avoid allowing
    * stray fetch failures from possibly retriggering the detection of a node as lost.
    */
-  private def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
+  private[scheduler] def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
     if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
       failedEpoch(execId) = currentEpoch
@@ -993,17 +967,19 @@ class DAGScheduler(
       logDebug("Additional executor lost message for " + execId +
                "(epoch " + currentEpoch + ")")
     }
+    submitWaitingStages()
   }
 
-  private def handleExecutorAdded(execId: String, host: String) {
+  private[scheduler] def handleExecutorAdded(execId: String, host: String) {
     // remove from failedEpoch(execId) ?
     if (failedEpoch.contains(execId)) {
       logInfo("Host added was in lost list earlier: " + host)
       failedEpoch -= execId
     }
+    submitWaitingStages()
   }
 
-  private def handleStageCancellation(stageId: Int) {
+  private[scheduler] def handleStageCancellation(stageId: Int) {
     if (stageIdToJobIds.contains(stageId)) {
       val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
       jobsThatUseStage.foreach(jobId => {
@@ -1012,22 +988,24 @@ class DAGScheduler(
     } else {
       logInfo("No active jobs to kill for Stage " + stageId)
     }
+    submitWaitingStages()
   }
 
-  private def handleJobCancellation(jobId: Int, reason: String = "") {
+  private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
     if (!jobIdToStageIds.contains(jobId)) {
       logDebug("Trying to cancel unregistered job " + jobId)
     } else {
       failJobAndIndependentStages(jobIdToActiveJob(jobId),
         "Job %d cancelled %s".format(jobId, reason), None)
     }
+    submitWaitingStages()
   }
 
   /**
    * Aborts all jobs depending on a particular Stage. This is called in response to a task set
    * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
    */
-  private def abortStage(failedStage: Stage, reason: String) {
+  private[scheduler] def abortStage(failedStage: Stage, reason: String) {
     if (!stageIdToStage.contains(failedStage.id)) {
       // Skip all the actions if the stage has been removed.
       return
@@ -1156,13 +1134,88 @@ class DAGScheduler(
   }
 
   def stop() {
-    if (eventProcessActor != null) {
-      eventProcessActor ! StopDAGScheduler
-    }
+    logInfo("Stopping DAGScheduler")
+    dagSchedulerActorSupervisor ! PoisonPill
     taskScheduler.stop()
   }
 }
 
+private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
+  extends Actor with Logging {
+
+  override val supervisorStrategy =
+    OneForOneStrategy() {
+      case x: Exception =>
+        logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
+          .format(x.getMessage))
+        dagScheduler.doCancelAllJobs()
+        dagScheduler.sc.stop()
+        Stop
+    }
+
+  def receive = {
+    case p: Props => sender ! context.actorOf(p)
+    case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")
+  }
+}
+
+private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
+  extends Actor with Logging {
+
+  override def preStart() {
+    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always
+    // valid when the messages arrive
+    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
+  }
+
+  /**
+   * The main event loop of the DAG scheduler.
+   */
+  def receive = {
+    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
+      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
+        listener, properties)
+
+    case StageCancelled(stageId) =>
+      dagScheduler.handleStageCancellation(stageId)
+
+    case JobCancelled(jobId) =>
+      dagScheduler.handleJobCancellation(jobId)
+
+    case JobGroupCancelled(groupId) =>
+      dagScheduler.handleJobGroupCancelled(groupId)
+
+    case AllJobsCancelled =>
+      dagScheduler.doCancelAllJobs()
+
+    case ExecutorAdded(execId, host) =>
+      dagScheduler.handleExecutorAdded(execId, host)
+
+    case ExecutorLost(execId) =>
+      dagScheduler.handleExecutorLost(execId)
+
+    case BeginEvent(task, taskInfo) =>
+      dagScheduler.handleBeginEvent(task, taskInfo)
+
+    case GettingResultEvent(taskInfo) =>
+      dagScheduler.handleGetTaskResult(taskInfo)
+
+    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
+      dagScheduler.handleTaskCompletion(completion)
+
+    case TaskSetFailed(taskSet, reason) =>
+      dagScheduler.handleTaskSetFailed(taskSet, reason)
+
+    case ResubmitFailedStages =>
+      dagScheduler.resubmitFailedStages()
+  }
+
+  override def postStop() {
+    // Cancel any active jobs in postStop hook
+    dagScheduler.cleanUpAfterSchedulerStop()
+  }
+}
+
 private[spark] object DAGScheduler {
   // The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
   // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 0800c5684c60f5ad45972ba16af8239eebf54e3e..23f57441b4b1181603f91656f91ed00389f8d568 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -57,7 +57,7 @@ private[scheduler]
 case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
 
 private[scheduler]
-case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
+case class GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent
 
 private[scheduler] case class CompletionEvent(
     task: Task[_],
@@ -76,5 +76,3 @@ private[scheduler]
 case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
 
 private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
-
-private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a81b834267f0d76a760383c28bf07a57e623ecf3..f3bd0797aa035f76f95e8d127022fea37b33e21f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -465,7 +465,7 @@ private[spark] class TaskSetManager(
   def handleTaskGettingResult(tid: Long) = {
     val info = taskInfos(tid)
     info.markGettingResult()
-    sched.dagScheduler.taskGettingResult(tasks(info.index), info)
+    sched.dagScheduler.taskGettingResult(info)
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index ff69eb7e53f8e53033a11e1d81ec55a587ba64da..d172dd1ac8e1b43bb1dfde6364c22c46c601e3cd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -21,6 +21,8 @@ import scala.Tuple2
 import scala.collection.mutable.{HashSet, HashMap, Map}
 import scala.language.reflectiveCalls
 
+import akka.actor._
+import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark._
@@ -28,19 +30,16 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
 
-/**
- * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
- * rather than spawning an event loop thread as happens in the real code. They use EasyMock
- * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are
- * submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead
- * host notifications are sent). In addition, tests may check for side effects on a non-mocked
- * MapOutputTracker instance.
- *
- * Tests primarily consist of running DAGScheduler#processEvent and
- * DAGScheduler#submitWaitingStages (via test utility functions like runEvent or respondToTaskSet)
- * and capturing the resulting TaskSets from the mock TaskScheduler.
- */
-class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
+class BuggyDAGEventProcessActor extends Actor {
+  val state = 0
+  def receive = {
+    case _ => throw new SparkException("error")
+  }
+}
+
+class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite
+  with ImplicitSender with BeforeAndAfter with LocalSparkContext {
+
   val conf = new SparkConf
   /** Set of TaskSets the DAGScheduler has requested executed. */
   val taskSets = scala.collection.mutable.Buffer[TaskSet]()
@@ -82,6 +81,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
 
   var mapOutputTracker: MapOutputTrackerMaster = null
   var scheduler: DAGScheduler = null
+  var dagEventProcessTestActor: TestActorRef[DAGSchedulerEventProcessActor] = null
 
   /**
    * Set of cache locations to return from our mock BlockManagerMaster.
@@ -121,6 +121,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     results.clear()
     mapOutputTracker = new MapOutputTrackerMaster(conf)
     scheduler = new DAGScheduler(
+        sc,
         taskScheduler,
         sc.listenerBus,
         mapOutputTracker,
@@ -131,10 +132,13 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
         runLocallyWithinThread(job)
       }
     }
+    dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
+      Props(classOf[DAGSchedulerEventProcessActor], scheduler))(system)
   }
 
-  after {
-    scheduler.stop()
+  override def afterAll() {
+    super.afterAll()
+    TestKit.shutdownActorSystem(system)
   }
 
   /**
@@ -178,8 +182,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
    * DAGScheduler event loop.
    */
   private def runEvent(event: DAGSchedulerEvent) {
-    assert(!scheduler.processEvent(event))
-    scheduler.submitWaitingStages()
+    dagEventProcessTestActor.receive(event)
   }
 
   /**
@@ -209,7 +212,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
       listener: JobListener = jobListener): Int = {
     val jobId = scheduler.nextJobId.getAndIncrement()
     runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, listener))
-    return jobId
+    jobId
   }
 
   /** Sends TaskSetFailed to the scheduler. */
@@ -223,19 +226,17 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
   }
 
   test("zero split job") {
-    val rdd = makeRdd(0, Nil)
     var numResults = 0
     val fakeListener = new JobListener() {
       override def taskSucceeded(partition: Int, value: Any) = numResults += 1
       override def jobFailed(exception: Exception) = throw exception
     }
-    submit(rdd, Array(), listener = fakeListener)
+    submit(makeRdd(0, Nil), Array(), listener = fakeListener)
     assert(numResults === 0)
   }
 
   test("run trivial job") {
-    val rdd = makeRdd(1, Nil)
-    submit(rdd, Array(0))
+    submit(makeRdd(1, Nil), Array(0))
     complete(taskSets(0), List((Success, 42)))
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty
@@ -529,6 +530,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     assertDataStructuresEmpty
   }
 
+  test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") {
+    val actorSystem = ActorSystem("test")
+    val supervisor = actorSystem.actorOf(
+      Props(classOf[DAGSchedulerActorSupervisor], scheduler), "dagSupervisor")
+    supervisor ! Props[BuggyDAGEventProcessActor]
+    val child = expectMsgType[ActorRef]
+    watch(child)
+    child ! "hi"
+    expectMsgPF(){ case Terminated(child) => () }
+    assert(scheduler.sc.dagScheduler === null)
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
    * Note that this checks only the host and not the executor ID.
@@ -561,3 +574,4 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     assert(scheduler.waitingStages.isEmpty)
   }
 }
+
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 2fb750d9ee3784360affa2051b03f5532a0d9377..a8b605c5b212a24257bcfb6976c04370be726cb3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -305,7 +305,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
       override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
       override def executorAdded(execId: String, host: String) {}
     }
-
+    taskScheduler.setDAGScheduler(dagScheduler)
     // Give zero core offers. Should not generate any tasks
     val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
       new WorkerOffer("executor1", "host1", 0))