diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 9d6ea782bd83c000f5c7d666018a2f2587e3759d..f0bc85865c5e324270ff3201dc8d0ff6feba00d8 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -20,6 +20,7 @@ import spark.partial.BoundedDouble import spark.partial.CountEvaluator import spark.partial.GroupedCountEvaluator import spark.partial.PartialResult +import spark.rdd.CoalescedRDD import spark.rdd.CartesianRDD import spark.rdd.FilteredRDD import spark.rdd.FlatMappedRDD @@ -231,6 +232,12 @@ abstract class RDD[T: ClassManifest]( def distinct(): RDD[T] = distinct(splits.size) + /** + * Return a new RDD that is reduced into `numSplits` partitions. + */ + def coalesce(numSplits: Int = sc.defaultParallelism): RDD[T] = + new CoalescedRDD(this, numSplits) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 60025b459c383168f694b636334e85663a5b1522..295eaa57c0c2e609a542220a8d0d168477864e23 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -130,6 +130,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest, other.classManifest) + /** + * Return a new RDD that is reduced into the default number of partitions. + */ + def coalesce(): RDD[T] = coalesce(rdd.context.defaultParallelism) + + /** + * Return a new RDD that is reduced into `numSplits` partitions. + */ + def coalesce(numSplits: Int): RDD[T] = rdd.coalesce(numSplits) + /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 0b74607fb85a6a5d0456b58744eba49bc1f98585..0d08fd239632b286b87848710f089684a9da5493 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -114,12 +114,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } test("CoalescedRDD") { - testCheckpointing(new CoalescedRDD(_, 2)) + testCheckpointing(_.coalesce(2)) // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed // Current implementation of CoalescedRDDSplit has transient reference to parent RDD, // so only the RDD will reduce in serialized size, not the splits. - testParentCheckpointing(new CoalescedRDD(_, 2), true, false) + testParentCheckpointing(_.coalesce(2), true, false) // Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits. diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index fe7deb10d63b001ca5a3db3e9398dcb38c233126..ffa866de7532fc4bae950edfda3e34d40590c7d9 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -122,7 +122,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val data = sc.parallelize(1 to 10, 10) - val coalesced1 = new CoalescedRDD(data, 2) + val coalesced1 = data.coalesce(2) assert(coalesced1.collect().toList === (1 to 10).toList) assert(coalesced1.glom().collect().map(_.toList).toList === List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10))) @@ -133,19 +133,19 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList === List(5, 6, 7, 8, 9)) - val coalesced2 = new CoalescedRDD(data, 3) + val coalesced2 = data.coalesce(3) assert(coalesced2.collect().toList === (1 to 10).toList) assert(coalesced2.glom().collect().map(_.toList).toList === List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10))) - val coalesced3 = new CoalescedRDD(data, 10) + val coalesced3 = data.coalesce(10) assert(coalesced3.collect().toList === (1 to 10).toList) assert(coalesced3.glom().collect().map(_.toList).toList === (1 to 10).map(x => List(x)).toList) // If we try to coalesce into more partitions than the original RDD, it should just // keep the original number of partitions. - val coalesced4 = new CoalescedRDD(data, 20) + val coalesced4 = data.coalesce(20) assert(coalesced4.collect().toList === (1 to 10).toList) assert(coalesced4.glom().collect().map(_.toList).toList === (1 to 10).map(x => List(x)).toList)