Skip to content
Snippets Groups Projects
Commit 8a8a8f2d authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #705 from rxin/errormessages

Throw a more meaningful message when runJob is called to launch tasks on non-existent partitions.
parents ed8415b2 69316603
No related branches found
No related tags found
No related merge requests found
...@@ -251,6 +251,15 @@ class DAGScheduler( ...@@ -251,6 +251,15 @@ class DAGScheduler(
if (partitions.size == 0) { if (partitions.size == 0) {
return return
} }
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = finalRdd.partitions.length
partitions.find(p => p >= maxPartitions).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val (toSubmit, waiter) = prepareJob( val (toSubmit, waiter) = prepareJob(
finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties) finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties)
eventQueue.put(toSubmit) eventQueue.put(toSubmit)
......
...@@ -302,4 +302,10 @@ class RDDSuite extends FunSuite with SharedSparkContext { ...@@ -302,4 +302,10 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements") assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
} }
} }
test("runJob on an invalid partition") {
intercept[IllegalArgumentException] {
sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment