diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index e492279b4ec6444dfc793f0a0a20309c60b8c399..2aad7956b41c4414ea3a15f57fbeb5d1099039f5 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -15,9 +15,11 @@ private[spark] class ResultTask[T, U](
 
   override def run(attemptId: Long): U = {
     val context = new TaskContext(stageId, partition, attemptId)
-    val result = func(context, rdd.iterator(split, context))
-    context.executeOnCompleteCallbacks()
-    result
+    try {
+      func(context, rdd.iterator(split, context))
+    } finally {
+      context.executeOnCompleteCallbacks()
+    }
   }
 
   override def preferredLocations: Seq[String] = locs
diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f9378773406d309ff980cb9981458efba4469697
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
@@ -0,0 +1,43 @@
+package spark.scheduler
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import spark.TaskContext
+import spark.RDD
+import spark.SparkContext
+import spark.Split
+
+class TaskContextSuite extends FunSuite with BeforeAndAfter {
+
+  var sc: SparkContext = _
+
+  after {
+    if (sc != null) {
+      sc.stop()
+      sc = null
+    }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
+  }
+
+  test("Calls executeOnCompleteCallbacks after failure") {
+    var completed = false
+    sc = new SparkContext("local", "test")
+    val rdd = new RDD[String](sc) {
+      override val splits = Array[Split](StubSplit(0))
+      override val dependencies = List()
+      override def compute(split: Split, context: TaskContext) = {
+        context.addOnCompleteCallback(() => completed = true)
+        sys.error("failed")
+      }
+    }
+    val func = (c: TaskContext, i: Iterator[String]) => i.next
+    val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
+    intercept[RuntimeException] {
+      task.run(0)
+    }
+    assert(completed === true)
+  }
+
+  case class StubSplit(val index: Int) extends Split
+}
\ No newline at end of file