Skip to content
Snippets Groups Projects
Commit 054feb64 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Fixed a bug with zip

parent b5900d47
No related branches found
No related tags found
No related merge requests found
...@@ -10,17 +10,17 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest]( ...@@ -10,17 +10,17 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
@transient rdd2: RDD[U] @transient rdd2: RDD[U]
) extends Partition { ) extends Partition {
var split1 = rdd1.partitions(idx) var partition1 = rdd1.partitions(idx)
var split2 = rdd1.partitions(idx) var partition2 = rdd2.partitions(idx)
override val index: Int = idx override val index: Int = idx
def splits = (split1, split2) def partitions = (partition1, partition2)
@throws(classOf[IOException]) @throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) { private def writeObject(oos: ObjectOutputStream) {
// Update the reference to parent split at the time of task serialization // Update the reference to parent partition at the time of task serialization
split1 = rdd1.partitions(idx) partition1 = rdd1.partitions(idx)
split2 = rdd2.partitions(idx) partition2 = rdd2.partitions(idx)
oos.defaultWriteObject() oos.defaultWriteObject()
} }
} }
...@@ -43,13 +43,13 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( ...@@ -43,13 +43,13 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
} }
override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = { override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = {
val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context)) rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context))
} }
override def getPreferredLocations(s: Partition): Seq[String] = { override def getPreferredLocations(s: Partition): Seq[String] = {
val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2)) rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2))
} }
override def clearDependencies() { override def clearDependencies() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment