diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 3d3b9ea01181ce1004c388b7f6b6e693887dae66..8173ef709dc4fb6d017c1eee204bceb8a2c9dda2 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -251,6 +251,15 @@ class DAGScheduler(
     if (partitions.size == 0) {
       return
     }
+
+    // Check to make sure we are not launching a task on a partition that does not exist.
+    val maxPartitions = finalRdd.partitions.length
+    partitions.find(p => p >= maxPartitions).foreach { p =>
+      throw new IllegalArgumentException(
+        "Attempting to access a non-existent partition: " + p + ". " +
+        "Total number of partitions: " + maxPartitions)
+    }
+
     val (toSubmit, waiter) = prepareJob(
         finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties)
     eventQueue.put(toSubmit)
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index aa3ee5f5eea17273ada672806c8d572580a99c5a..7f7d4c821154eafc181fbcab6a2fe9c96b394eea 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -302,4 +302,10 @@ class RDDSuite extends FunSuite with SharedSparkContext {
       assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
     }
   }
+
+  test("runJob on an invalid partition") {
+    intercept[IllegalArgumentException] {
+      sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
+    }
+  }
 }