From 8d271c90fa496cb24e2b7362ef0497563591b97d Mon Sep 17 00:00:00 2001
From: Zhen Peng <zhenpeng01@baidu.com>
Date: Mon, 26 May 2014 21:30:25 -0700
Subject: [PATCH] SPARK-1929 DAGScheduler suspended by local task OOM

DAGScheduler does not handle local task OOM properly, and will wait for the job result forever.

Author: Zhen Peng <zhenpeng01@baidu.com>

Closes #883 from zhpengg/bugfix-dag-scheduler-oom and squashes the following commits:

76f7eda [Zhen Peng] remove redundant memory allocations
aa63161 [Zhen Peng] SPARK-1929 DAGScheduler suspended by local task OOM
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  6 +++++-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 14 ++++++++++++++
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ff411e24a3..c70aa0e6e4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.io.NotSerializableException
+import java.io.{NotSerializableException, PrintWriter, StringWriter}
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -580,6 +580,10 @@ class DAGScheduler(
       case e: Exception =>
         jobResult = JobFailed(e)
         job.listener.jobFailed(e)
+      case oom: OutOfMemoryError =>
+        val exception = new SparkException("job failed for Out of memory exception", oom)
+        jobResult = JobFailed(exception)
+        job.listener.jobFailed(exception)
     } finally {
       val s = job.finalStage
       stageIdToJobIds -= s.id    // clean up data structures that were populated for a local job,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d172dd1ac8..81e64c1846 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -256,6 +256,20 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
+  test("local job oom") {
+    val rdd = new MyRDD(sc, Nil) {
+      override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
+        throw new java.lang.OutOfMemoryError("test local job oom")
+      override def getPartitions = Array( new Partition { override def index = 0 } )
+      override def getPreferredLocations(split: Partition) = Nil
+      override def toString = "DAGSchedulerSuite Local RDD"
+    }
+    val jobId = scheduler.nextJobId.getAndIncrement()
+    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
+    assert(results.size == 0)
+    assertDataStructuresEmpty
+  }
+
   test("run trivial job w/ dependency") {
     val baseRdd = makeRdd(1, Nil)
     val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
-- 
GitLab