Skip to content
Snippets Groups Projects
Commit 3cc8ab6e authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #541 from stephenh/shufflecoalesce

Add a shuffle parameter to coalesce.
parents cad507ad dd854d5b
No related branches found
No related tags found
No related merge requests found
......@@ -31,6 +31,7 @@ import spark.rdd.MapPartitionsRDD
import spark.rdd.MapPartitionsWithIndexRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
import spark.rdd.ShuffledRDD
import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
......@@ -237,7 +238,14 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
def coalesce(numPartitions: Int): RDD[T] = new CoalescedRDD(this, numPartitions)
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
if (shuffle) {
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys
} else {
new CoalescedRDD(this, numPartitions)
}
}
/**
* Return a sampled subset of this RDD.
......
......@@ -57,6 +57,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
*/
def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD =
fromRDD(srdd.coalesce(numPartitions, shuffle))
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
......
......@@ -66,7 +66,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions))
def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions))
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] =
fromRDD(rdd.coalesce(numPartitions, shuffle))
/**
* Return a sampled subset of this RDD.
......
......@@ -43,6 +43,12 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions)
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] =
rdd.coalesce(numPartitions, shuffle)
/**
* Return a sampled subset of this RDD.
*/
......
......@@ -3,7 +3,7 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import spark.SparkContext._
import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD}
import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
class RDDSuite extends FunSuite with LocalSparkContext {
......@@ -184,6 +184,11 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(coalesced4.collect().toList === (1 to 10).toList)
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
// we can optionally shuffle to keep the upstream parallel
val coalesced5 = data.coalesce(1, shuffle = true)
assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] !=
null)
}
test("zipped RDDs") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment