Skip to content
Snippets Groups Projects
Commit a85758c2 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #907 from stephenh/document_coalesce_shuffle

Add better docs for coalesce.
parents 084fc369 59003d38
No related branches found
No related tags found
No related merge requests found
......@@ -267,6 +267,23 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
if (shuffle) {
......
......@@ -140,7 +140,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.union(emptyKv).collect().size === 2)
}
test("cogrouped RDDs") {
test("coalesced RDDs") {
val data = sc.parallelize(1 to 10, 10)
val coalesced1 = data.coalesce(2)
......@@ -175,8 +175,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val coalesced5 = data.coalesce(1, shuffle = true)
assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
// when shuffling, we can increase the number of partitions
val coalesced6 = data.coalesce(20, shuffle = true)
assert(coalesced6.partitions.size === 20)
assert(coalesced6.collect().toSet === (1 to 10).toSet)
}
test("cogrouped RDDs with locality") {
test("coalesced RDDs with locality") {
val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
val coal3 = data3.coalesce(3)
val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
......@@ -197,11 +203,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val coalesced4 = data.coalesce(20)
val listOfLists = coalesced4.glom().collect().map(_.toList).toList
val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
assert( sortedList === (1 to 9).
assert(sortedList === (1 to 9).
map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
}
test("cogrouped RDDs with locality, large scale (10K partitions)") {
test("coalesced RDDs with locality, large scale (10K partitions)") {
// large scale experiment
import collection.mutable
val rnd = scala.util.Random
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment