From 69316603d6bf11ecf1ea3dab63df178bad835e2d Mon Sep 17 00:00:00 2001 From: Reynold Xin <reynoldx@gmail.com> Date: Mon, 15 Jul 2013 22:50:11 -0700 Subject: [PATCH] Throw a more meaningful message when runJob is called to launch tasks on non-existent partitions. --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 9 +++++++++ core/src/test/scala/spark/RDDSuite.scala | 6 ++++++ 2 files changed, 15 insertions(+) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 3d3b9ea011..8173ef709d 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -251,6 +251,15 @@ class DAGScheduler( if (partitions.size == 0) { return } + + // Check to make sure we are not launching a task on a partition that does not exist. + val maxPartitions = finalRdd.partitions.length + partitions.find(p => p >= maxPartitions).foreach { p => + throw new IllegalArgumentException( + "Attempting to access a non-existent partition: " + p + ". " + + "Total number of partitions: " + maxPartitions) + } + val (toSubmit, waiter) = prepareJob( finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties) eventQueue.put(toSubmit) diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index aa3ee5f5ee..7f7d4c8211 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -302,4 +302,10 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements") } } + + test("runJob on an invalid partition") { + intercept[IllegalArgumentException] { + sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false) + } + } } -- GitLab