diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index e80ec17aa5e0a4bbffd94b835fb18cff597c1f00..35b0e06785cf370b04a718b4874cf2302bdf99c3 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -10,17 +10,17 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest]( @transient rdd2: RDD[U] ) extends Partition { - var split1 = rdd1.partitions(idx) - var split2 = rdd1.partitions(idx) + var partition1 = rdd1.partitions(idx) + var partition2 = rdd2.partitions(idx) override val index: Int = idx - def splits = (split1, split2) + def partitions = (partition1, partition2) @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { - // Update the reference to parent split at the time of task serialization - split1 = rdd1.partitions(idx) - split2 = rdd2.partitions(idx) + // Update the reference to parent partition at the time of task serialization + partition1 = rdd1.partitions(idx) + partition2 = rdd2.partitions(idx) oos.defaultWriteObject() } } @@ -43,13 +43,13 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( } override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = { - val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits - rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context)) + val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions + rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context)) } override def getPreferredLocations(s: Partition): Seq[String] = { - val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits - rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2)) + val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions + rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2)) } override def clearDependencies() {