diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index b320be8863a2f137ad85641161554bb804889eae..f599eb00bdff68259e7f08d958d7e8f954d97de9 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -40,7 +40,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
     eventQueue.put(HostLost(host))
   }
 
-  // Called by TaskScheduler to cancel an entier TaskSet due to repeated failures.
+  // Called by TaskScheduler to cancel an entire TaskSet due to repeated failures.
   override def taskSetFailed(taskSet: TaskSet, reason: String) {
     eventQueue.put(TaskSetFailed(taskSet, reason))
   }
@@ -54,8 +54,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
   // resubmit failed stages
   val POLL_TIMEOUT = 10L
 
-  private val lock = new Object          // Used for access to the entire DAGScheduler
-
   private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
 
   val nextRunId = new AtomicInteger(0)
@@ -337,9 +335,12 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
           val rdd = job.finalStage.rdd
           val split = rdd.splits(job.partitions(0))
           val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0)
-          val result = job.func(taskContext, rdd.iterator(split, taskContext))
-          taskContext.executeOnCompleteCallbacks()
-          job.listener.taskSucceeded(0, result)
+          try {
+            val result = job.func(taskContext, rdd.iterator(split, taskContext))
+            job.listener.taskSucceeded(0, result)
+          } finally {
+            taskContext.executeOnCompleteCallbacks()
+          }
         } catch {
           case e: Exception =>
             job.listener.jobFailed(e)
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 19f5328eee087774dfa50b48527e0fa905ea89d0..83641a2a8427b1d86551be7de56edaa8d4ec669a 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -81,7 +81,7 @@ private[spark] class ShuffleMapTask(
   with Externalizable
   with Logging {
 
-  def this() = this(0, null, null, 0, null)
+  protected def this() = this(0, null, null, 0, null)
 
   var split = if (rdd == null) {
     null
@@ -117,34 +117,34 @@ private[spark] class ShuffleMapTask(
 
   override def run(attemptId: Long): MapStatus = {
     val numOutputSplits = dep.partitioner.numPartitions
-    val partitioner = dep.partitioner
 
     val taskContext = new TaskContext(stageId, partition, attemptId)
+    try {
+      // Partition the map output.
+      val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
+      for (elem <- rdd.iterator(split, taskContext)) {
+        val pair = elem.asInstanceOf[(Any, Any)]
+        val bucketId = dep.partitioner.getPartition(pair._1)
+        buckets(bucketId) += pair
+      }
+      val bucketIterators = buckets.map(_.iterator)
 
-    // Partition the map output.
-    val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
-    for (elem <- rdd.iterator(split, taskContext)) {
-      val pair = elem.asInstanceOf[(Any, Any)]
-      val bucketId = partitioner.getPartition(pair._1)
-      buckets(bucketId) += pair
-    }
-    val bucketIterators = buckets.map(_.iterator)
+      val compressedSizes = new Array[Byte](numOutputSplits)
 
-    val compressedSizes = new Array[Byte](numOutputSplits)
+      val blockManager = SparkEnv.get.blockManager
+      for (i <- 0 until numOutputSplits) {
+        val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
+        // Get a Scala iterator from Java map
+        val iter: Iterator[(Any, Any)] = bucketIterators(i)
+        val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
+        compressedSizes(i) = MapOutputTracker.compressSize(size)
+      }
 
-    val blockManager = SparkEnv.get.blockManager
-    for (i <- 0 until numOutputSplits) {
-      val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
-      // Get a Scala iterator from Java map
-      val iter: Iterator[(Any, Any)] = bucketIterators(i)
-      val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
-      compressedSizes(i) = MapOutputTracker.compressSize(size)
+      return new MapStatus(blockManager.blockManagerId, compressedSizes)
+    } finally {
+      // Execute the callbacks on task completion.
+      taskContext.executeOnCompleteCallbacks()
     }
-
-    // Execute the callbacks on task completion.
-    taskContext.executeOnCompleteCallbacks()
-
-    return new MapStatus(blockManager.blockManagerId, compressedSizes)
   }
 
   override def preferredLocations: Seq[String] = locs