diff --git a/core/src/main/scala/spark/CoalescedRDD.scala b/core/src/main/scala/spark/CoalescedRDD.scala new file mode 100644 index 0000000000000000000000000000000000000000..a96f749543979e5df64e7d88c69d56d3eb669322 --- /dev/null +++ b/core/src/main/scala/spark/CoalescedRDD.scala @@ -0,0 +1,44 @@ +package spark + +private class CoalescedRDDSplit(val index: Int, val parents: Array[Split]) extends Split + +/** + * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of + * this RDD computes zero or more of the parent ones. Will produce exactly `maxPartitions` if the + * parent had more than this many partitions, or fewer if the parent had fewer. + * + * This transformation is useful when an RDD with many partitions gets filtered into a smaller one, + * or to avoid having a large number of small tasks when processing a directory with many files. + */ +class CoalescedRDD[T: ClassManifest](prev: RDD[T], maxPartitions: Int) + extends RDD[T](prev.context) + with Logging { + + @transient val splits_ : Array[Split] = { + val prevSplits = prev.splits + if (prevSplits.length < maxPartitions) { + prevSplits.zipWithIndex.map{ case (s, idx) => new CoalescedRDDSplit(idx, Array(s)) } + } else { + (0 until maxPartitions).map { i => + val rangeStart = (i * prevSplits.length) / maxPartitions + val rangeEnd = ((i + 1) * prevSplits.length) / maxPartitions + new CoalescedRDDSplit(i, prevSplits.slice(rangeStart, rangeEnd)) + }.toArray + } + } + + override def splits = splits_ + + override def compute(split: Split): Iterator[T] = { + split.asInstanceOf[CoalescedRDDSplit].parents.iterator.flatMap { + parentSplit => prev.iterator(parentSplit) + } + } + + val dependencies = List( + new NarrowDependency(prev) { + def getParents(id: Int): Seq[Int] = + splits(id).asInstanceOf[CoalescedRDDSplit].parents.map(_.index) + } + ) +} diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 04dbe3a3e4e389cf452d9298c6696a46c2f3d1e1..961d05bc82512cd158be44c137c4268d9256c540 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -71,4 +71,35 @@ class RDDSuite extends FunSuite with BeforeAndAfter { val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).flatMap(x => 1 to x).checkpoint() assert(rdd.collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)) } + + test("coalesced RDDs") { + sc = new SparkContext("local", "test") + val data = sc.parallelize(1 to 10, 10) + + val coalesced1 = new CoalescedRDD(data, 2) + assert(coalesced1.collect().toList === (1 to 10).toList) + assert(coalesced1.glom().collect().map(_.toList).toList === + List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10))) + + // Check that the narrow dependency is also specified correctly + assert(coalesced1.dependencies.head.getParents(0).toList === List(0, 1, 2, 3, 4)) + assert(coalesced1.dependencies.head.getParents(1).toList === List(5, 6, 7, 8, 9)) + + val coalesced2 = new CoalescedRDD(data, 3) + assert(coalesced2.collect().toList === (1 to 10).toList) + assert(coalesced2.glom().collect().map(_.toList).toList === + List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10))) + + val coalesced3 = new CoalescedRDD(data, 10) + assert(coalesced3.collect().toList === (1 to 10).toList) + assert(coalesced3.glom().collect().map(_.toList).toList === + (1 to 10).map(x => List(x)).toList) + + // If we try to coalesce into more partitions than the original RDD, it should just + // keep the original number of partitions. + val coalesced4 = new CoalescedRDD(data, 20) + assert(coalesced4.collect().toList === (1 to 10).toList) + assert(coalesced4.glom().collect().map(_.toList).toList === + (1 to 10).map(x => List(x)).toList) + } }