Skip to content
Snippets Groups Projects
Commit 1ad1331a authored by Reynold Xin's avatar Reynold Xin
Browse files

Added MapPartitionsWithSplitRDD.

parent 58eb44ac
No related branches found
No related tags found
No related merge requests found
......@@ -196,6 +196,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] =
new MapPartitionsRDD(this, sc.clean(f))
def mapPartitionsWithSplit[U: ClassManifest](f: (Int, Iterator[T]) => Iterator[U]): RDD[U] =
new MapPartitionsWithSplitRDD(this, sc.clean(f))
// Actions (launch a job to return a value to the user program)
def foreach(f: T => Unit) {
......@@ -417,3 +420,18 @@ class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = f(prev.iterator(split))
}
/**
* A variant of the MapPartitionsRDD that passes the split index into the
* closure. This can be used to generate or collect partition specific
* information such as the number of tuples in a partition.
*/
class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: (Int, Iterator[T]) => Iterator[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = f(split.index, prev.iterator(split))
}
......@@ -29,6 +29,11 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
assert(partitionSums.collect().toList === List(3, 7))
val partitionSumsWithSplit = nums.mapPartitionsWithSplit {
case(split, iter) => Iterator((split, iter.reduceLeft(_ + _)))
}
assert(partitionSumsWithSplit.collect().toList === List((0, 3), (1, 7)))
}
test("SparkContext.union") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment