diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ff411e24a3d856b97d7a12da5f01ce3d616d0397..c70aa0e6e452331cd466216a33f1d85e175db3f7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.io.NotSerializableException
+import java.io.{NotSerializableException, PrintWriter, StringWriter}
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -580,6 +580,10 @@ class DAGScheduler(
       case e: Exception =>
         jobResult = JobFailed(e)
         job.listener.jobFailed(e)
+      case oom: OutOfMemoryError =>
+        val exception = new SparkException("job failed for Out of memory exception", oom)
+        jobResult = JobFailed(exception)
+        job.listener.jobFailed(exception)
     } finally {
       val s = job.finalStage
       stageIdToJobIds -= s.id    // clean up data structures that were populated for a local job,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d172dd1ac8e1b43bb1dfde6364c22c46c601e3cd..81e64c1846ed5dceb30a8647213cd1c6a91ce90a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -256,6 +256,20 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
+  test("local job oom") {
+    val rdd = new MyRDD(sc, Nil) {
+      override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
+        throw new java.lang.OutOfMemoryError("test local job oom")
+      override def getPartitions = Array( new Partition { override def index = 0 } )
+      override def getPreferredLocations(split: Partition) = Nil
+      override def toString = "DAGSchedulerSuite Local RDD"
+    }
+    val jobId = scheduler.nextJobId.getAndIncrement()
+    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
+    assert(results.size == 0)
+    assertDataStructuresEmpty
+  }
+
   test("run trivial job w/ dependency") {
     val baseRdd = makeRdd(1, Nil)
     val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))