From 05bf4e4aff0d052a53d3e64c43688f07e27fec50 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@apache.org>
Date: Mon, 4 Aug 2014 20:39:18 -0700
Subject: [PATCH] [SPARK-2323] Exception in accumulator update should not crash
 DAGScheduler & SparkContext

Author: Reynold Xin <rxin@apache.org>

Closes #1772 from rxin/accumulator-dagscheduler and squashes the following commits:

6a58520 [Reynold Xin] [SPARK-2323] Exception in accumulator update should not crash DAGScheduler & SparkContext.
---
 .../org/apache/spark/scheduler/DAGScheduler.scala     |  9 +++++++--
 .../apache/spark/scheduler/DAGSchedulerSuite.scala    | 11 +++--------
 2 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d87c304898..9fa3a4e9c7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -904,8 +904,13 @@ class DAGScheduler(
     event.reason match {
       case Success =>
         if (event.accumUpdates != null) {
-          // TODO: fail the stage if the accumulator update fails...
-          Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
+          try {
+            Accumulators.add(event.accumUpdates)
+          } catch {
+            // If we see an exception during accumulator update, just log the error and move on.
+            case e: Exception =>
+              logError(s"Failed to update accumulators for $task", e)
+          }
         }
         stage.pendingTasks -= task
         task match {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 36e238b4c9..8c1b0fed11 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -622,8 +622,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
-  // TODO: Fix this and un-ignore the test.
-  ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
+  test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
     val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
       override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
       override def zero(initialValue: Int): Int = 0
@@ -633,14 +632,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     })
 
     // Run this on executors
-    intercept[SparkDriverExecutionException] {
-      sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
-    }
+    sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
 
     // Run this within a local thread
-    intercept[SparkDriverExecutionException] {
-      sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
-    }
+    sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
 
     // Make sure we can still run local commands as well as cluster commands.
     assert(sc.parallelize(1 to 10, 2).count() === 10)
-- 
GitLab