Skip to content
Snippets Groups Projects
Commit 05bf4e4a authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-2323] Exception in accumulator update should not crash DAGScheduler & SparkContext

Author: Reynold Xin <rxin@apache.org>

Closes #1772 from rxin/accumulator-dagscheduler and squashes the following commits:

6a58520 [Reynold Xin] [SPARK-2323] Exception in accumulator update should not crash DAGScheduler & SparkContext.
parent 9fd82dbb
No related branches found
No related tags found
No related merge requests found
......@@ -904,8 +904,13 @@ class DAGScheduler(
event.reason match {
case Success =>
if (event.accumUpdates != null) {
// TODO: fail the stage if the accumulator update fails...
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
try {
Accumulators.add(event.accumUpdates)
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
stage.pendingTasks -= task
task match {
......
......@@ -622,8 +622,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
// TODO: Fix this and un-ignore the test.
ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
override def zero(initialValue: Int): Int = 0
......@@ -633,14 +632,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
})
// Run this on executors
intercept[SparkDriverExecutionException] {
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
}
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
// Run this within a local thread
intercept[SparkDriverExecutionException] {
sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
}
sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
// Make sure we can still run local commands as well as cluster commands.
assert(sc.parallelize(1 to 10, 2).count() === 10)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment