diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 8e7e1457c14a3097747e5c5175c8aaccd3ed8b9c..bded55238f8ce85b1b26179396d3dc985f286376 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -439,22 +439,22 @@ abstract class RDD[T: ClassManifest]( */ def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other) - def zipAndMapPartitions[B: ClassManifest, V: ClassManifest]( - f: (Iterator[T], Iterator[B]) => Iterator[V], - rdd2: RDD[B]) = + def zipPartitions[B: ClassManifest, V: ClassManifest]( + f: (Iterator[T], Iterator[B]) => Iterator[V], + rdd2: RDD[B]) = new MapZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2) - def zipAndMapPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]( - f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V], - rdd2: RDD[B], - rdd3: RDD[C]) = + def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]( + f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V], + rdd2: RDD[B], + rdd3: RDD[C]) = new MapZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3) - def zipAndMapPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]( - f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], - rdd2: RDD[B], - rdd3: RDD[C], - rdd4: RDD[D]) = + def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]( + f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], + rdd2: RDD[B], + rdd3: RDD[C], + rdd4: RDD[D]) = new MapZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4) diff --git a/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala index 6653b3b44451e982156a3547be67e6c4461ec6b7..3520fd24b0fd68f46ae112729121e08ee1b82812 100644 --- a/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala @@ -4,13 +4,13 @@ import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} private[spark] class MapZippedPartition( - idx: Int, - @transient rdds: Seq[RDD[_]] - ) extends Partition { + idx: Int, + @transient rdds: Seq[RDD[_]]) + extends Partition { override val index: Int = idx var partitionValues = rdds.map(rdd => rdd.partitions(idx)) - def partitions = partitionValues + def partitions = partitionValues @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { @@ -68,7 +68,8 @@ class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManife } } -class MapZippedPartitionsRDD3[A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( +class MapZippedPartitionsRDD3 + [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], var rdd1: RDD[A], @@ -78,8 +79,8 @@ class MapZippedPartitionsRDD3[A: ClassManifest, B: ClassManifest, C: ClassManife override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[MapZippedPartition].partitions - f(rdd1.iterator(partitions(0), context), - rdd2.iterator(partitions(1), context), + f(rdd1.iterator(partitions(0), context), + rdd2.iterator(partitions(1), context), rdd3.iterator(partitions(2), context)) } @@ -91,7 +92,8 @@ class MapZippedPartitionsRDD3[A: ClassManifest, B: ClassManifest, C: ClassManife } } -class MapZippedPartitionsRDD4[A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( +class MapZippedPartitionsRDD4 + [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest]( sc: SparkContext, f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], var rdd1: RDD[A], @@ -102,8 +104,8 @@ class MapZippedPartitionsRDD4[A: ClassManifest, B: ClassManifest, C: ClassManife override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[MapZippedPartition].partitions - f(rdd1.iterator(partitions(0), context), - rdd2.iterator(partitions(1), context), + f(rdd1.iterator(partitions(0), context), + rdd2.iterator(partitions(1), context), rdd3.iterator(partitions(2), context), rdd4.iterator(partitions(3), context)) } diff --git a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala index f65a6464169e2da118958ee3494b741022f4bd8d..834b517cbc00060ec31bfd29eeed69adcf7b5464 100644 --- a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala +++ b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala @@ -24,7 +24,7 @@ class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext { val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) val data3 = sc.makeRDD(Array(1.0, 2.0), 2) - val zippedRDD = data1.zipAndMapPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3) + val zippedRDD = data1.zipPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3) val obtainedSizes = zippedRDD.collect() val expectedSizes = Array(2, 3, 1, 2, 3, 1)