diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index c45b759f007cc83653fe90a87bf89965a5e50f2f..e7221e3032c11993bc375f2ce6e687d91f78730b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -258,7 +258,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup addPartToPGroup(nxt_part, pgroup) - groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple + groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple numCreated += 1 } } @@ -267,7 +267,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc var (nxt_replica, nxt_part) = rotIt.next() val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup - groupHash.get(nxt_replica).get += pgroup + groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup var tries = 0 while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part nxt_part = rotIt.next()._2 diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 6ea045198e2ced7346c14c150f7e37f4ad10fc03..2924de112934c3a5ad07604d356965fafcaa196c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -351,6 +351,20 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } + // Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception + test("coalesced RDDs with locality, fail first pass") { + val initialPartitions = 1000 + val targetLen = 50 + val couponCount = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt // = 492 + + val blocks = (1 to initialPartitions).map { i => + (i, List(if (i > couponCount) "m2" else "m1")) + } + val data = sc.makeRDD(blocks) + val coalesced = data.coalesce(targetLen) + assert(coalesced.partitions.length == targetLen) + } + test("zipped RDDs") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val zipped = nums.zip(nums.map(_ + 1.0))