diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index bded55238f8ce85b1b26179396d3dc985f286376..4310f745f37d492d447d4f4054196b76a20ffa80 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -35,9 +35,9 @@ import spark.rdd.ShuffledRDD import spark.rdd.SubtractedRDD import spark.rdd.UnionRDD import spark.rdd.ZippedRDD -import spark.rdd.MapZippedPartitionsRDD2 -import spark.rdd.MapZippedPartitionsRDD3 -import spark.rdd.MapZippedPartitionsRDD4 +import spark.rdd.ZippedPartitionsRDD2 +import spark.rdd.ZippedPartitionsRDD3 +import spark.rdd.ZippedPartitionsRDD4 import spark.storage.StorageLevel import SparkContext._ @@ -441,21 +441,21 @@ abstract class RDD[T: ClassManifest]( def zipPartitions[B: ClassManifest, V: ClassManifest]( f: (Iterator[T], Iterator[B]) => Iterator[V], - rdd2: RDD[B]) = - new MapZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2) + rdd2: RDD[B]): RDD[V] = + new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2) def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]( f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V], rdd2: RDD[B], - rdd3: RDD[C]) = - new MapZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3) + rdd3: RDD[C]): RDD[V] = + new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3) def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]( f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], rdd2: RDD[B], rdd3: RDD[C], - rdd4: RDD[D]) = - new MapZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4) + rdd4: RDD[D]): RDD[V] = + new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4) // Actions (launch a job to return a value to the user program) diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala index 3520fd24b0fd68f46ae112729121e08ee1b82812..b3113c1969182d55b6655e0e0b78c765ba7fef3e 100644 --- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala @@ -3,7 +3,7 @@ package spark.rdd import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} -private[spark] class MapZippedPartition( +private[spark] class ZippedPartitions( idx: Int, @transient rdds: Seq[RDD[_]]) extends Partition { @@ -20,7 +20,7 @@ private[spark] class MapZippedPartition( } } -abstract class MapZippedPartitionsBaseRDD[V: ClassManifest]( +abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( sc: SparkContext, var rdds: Seq[RDD[_]]) extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) { @@ -32,13 +32,13 @@ abstract class MapZippedPartitionsBaseRDD[V: ClassManifest]( } val array = new Array[Partition](sizes(0)) for (i <- 0 until sizes(0)) { - array(i) = new MapZippedPartition(i, rdds) + array(i) = new ZippedPartitions(i, rdds) } array } override def getPreferredLocations(s: Partition): Seq[String] = { - val splits = s.asInstanceOf[MapZippedPartition].partitions + val splits = s.asInstanceOf[ZippedPartitions].partitions val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) preferredLocations.reduce((x, y) => x.intersect(y)) } @@ -49,15 +49,15 @@ abstract class MapZippedPartitionsBaseRDD[V: ClassManifest]( } } -class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]( +class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest]( sc: SparkContext, f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B]) - extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) { + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[MapZippedPartition].partitions + val partitions = s.asInstanceOf[ZippedPartitions].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } @@ -68,17 +68,17 @@ class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManife } } -class MapZippedPartitionsRDD3 +class ZippedPartitionsRDD3 [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], var rdd3: RDD[C]) - extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) { + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[MapZippedPartition].partitions + val partitions = s.asInstanceOf[ZippedPartitions].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), rdd3.iterator(partitions(2), context)) @@ -92,7 +92,7 @@ class MapZippedPartitionsRDD3 } } -class MapZippedPartitionsRDD4 +class ZippedPartitionsRDD4 [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], @@ -100,10 +100,10 @@ class MapZippedPartitionsRDD4 var rdd2: RDD[B], var rdd3: RDD[C], var rdd4: RDD[D]) - extends MapZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) { + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { - val partitions = s.asInstanceOf[MapZippedPartition].partitions + val partitions = s.asInstanceOf[ZippedPartitions].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context), rdd3.iterator(partitions(2), context), diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala index 834b517cbc00060ec31bfd29eeed69adcf7b5464..5f60aa75d7f0334a4d99662e04abc9d785c47b4d 100644 --- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala +++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala @@ -11,20 +11,20 @@ import org.scalacheck.Prop._ import SparkContext._ -object MapZippedPartitionsSuite { +object ZippedPartitionsSuite { def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = { Iterator(i.toArray.size, s.toArray.size, d.toArray.size) } } -class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext { +class ZippedPartitionsSuite extends FunSuite with LocalSparkContext { test("print sizes") { sc = new SparkContext("local", "test") val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2) val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) val data3 = sc.makeRDD(Array(1.0, 2.0), 2) - val zippedRDD = data1.zipPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3) + val zippedRDD = data1.zipPartitions(ZippedPartitionsSuite.procZippedData, data2, data3) val obtainedSizes = zippedRDD.collect() val expectedSizes = Array(2, 3, 1, 2, 3, 1)