diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d87c3048985fcd3fce3321b393bc543ba14de55b..9fa3a4e9c71aeefab38a96c95e78418feef8b8e0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -904,8 +904,13 @@ class DAGScheduler(
     event.reason match {
       case Success =>
         if (event.accumUpdates != null) {
-          // TODO: fail the stage if the accumulator update fails...
-          Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
+          try {
+            Accumulators.add(event.accumUpdates)
+          } catch {
+            // If we see an exception during accumulator update, just log the error and move on.
+            case e: Exception =>
+              logError(s"Failed to update accumulators for $task", e)
+          }
         }
         stage.pendingTasks -= task
         task match {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 36e238b4c9434cc7cc2d006bdb3dc1086d7ac36c..8c1b0fed11f725ede71f8c6e39b05f1f9177462b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -622,8 +622,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
-  // TODO: Fix this and un-ignore the test.
-  ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
+  test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
     val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
       override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
       override def zero(initialValue: Int): Int = 0
@@ -633,14 +632,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     })
 
     // Run this on executors
-    intercept[SparkDriverExecutionException] {
-      sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
-    }
+    sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
 
     // Run this within a local thread
-    intercept[SparkDriverExecutionException] {
-      sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
-    }
+    sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
 
     // Make sure we can still run local commands as well as cluster commands.
     assert(sc.parallelize(1 to 10, 2).count() === 10)