From 7c23c0dc3ed721c95690fc49f435d9de6952523c Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Thu, 17 Jul 2014 01:01:14 -0700
Subject: [PATCH] [SPARK-2412] CoalescedRDD throws exception with certain pref
 locs

If the first pass of CoalescedRDD does not find the target number of locations AND the second pass finds new locations, an exception is thrown, as "groupHash.get(nxt_replica).get" is not valid.

The fix is just to add an ArrayBuffer to groupHash for that replica if it didn't already exist.

Author: Aaron Davidson <aaron@databricks.com>

Closes #1337 from aarondav/2412 and squashes the following commits:

f587b5d [Aaron Davidson] getOrElseUpdate
3ad8a3c [Aaron Davidson] [SPARK-2412] CoalescedRDD throws exception with certain pref locs
---
 .../scala/org/apache/spark/rdd/CoalescedRDD.scala  |  4 ++--
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 14 ++++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index c45b759f00..e7221e3032 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -258,7 +258,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
         val pgroup = PartitionGroup(nxt_replica)
         groupArr += pgroup
         addPartToPGroup(nxt_part, pgroup)
-        groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
+        groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple
         numCreated += 1
       }
     }
@@ -267,7 +267,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
       var (nxt_replica, nxt_part) = rotIt.next()
       val pgroup = PartitionGroup(nxt_replica)
       groupArr += pgroup
-      groupHash.get(nxt_replica).get += pgroup
+      groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
       var tries = 0
       while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
         nxt_part = rotIt.next()._2
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6ea045198e..2924de1129 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -351,6 +351,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     }
   }
 
+  // Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception
+  test("coalesced RDDs with locality, fail first pass") {
+    val initialPartitions = 1000
+    val targetLen = 50
+    val couponCount = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt // = 492
+
+    val blocks = (1 to initialPartitions).map { i =>
+      (i, List(if (i > couponCount) "m2" else "m1"))
+    }
+    val data = sc.makeRDD(blocks)
+    val coalesced = data.coalesce(targetLen)
+    assert(coalesced.partitions.length == targetLen)
+  }
+
   test("zipped RDDs") {
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     val zipped = nums.zip(nums.map(_ + 1.0))
-- 
GitLab