From f639b65eabcc8666b74af8f13a37c5fdf7e0185f Mon Sep 17 00:00:00 2001 From: Matthew Taylor <matthew.t@tbfe.net> Date: Tue, 19 Nov 2013 10:48:48 +0000 Subject: [PATCH] PartitionPruningRDD is using index from parent(review changes) --- .../apache/spark/rdd/PartitionPruningRDD.scala | 4 ++-- .../{ => rdd}/PartitionPruningRDDSuite.scala | 15 ++++----------- 2 files changed, 6 insertions(+), 13 deletions(-) rename core/src/test/scala/org/apache/spark/{ => rdd}/PartitionPruningRDDSuite.scala (92%) diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala index 2738a00894..574dd4233f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala @@ -33,8 +33,8 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo extends NarrowDependency[T](rdd) { @transient - val partitions: Array[Partition] = rdd.partitions.zipWithIndex - .filter(s => partitionFilterFunc(s._2)).map(_._1).zipWithIndex + val partitions: Array[Partition] = rdd.partitions + .filter(s => partitionFilterFunc(s.index)).zipWithIndex .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition } override def getParents(partitionId: Int) = { diff --git a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala similarity index 92% rename from core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala rename to core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala index 28e71e835f..53a7b7c44d 100644 --- a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.spark.rdd import org.scalatest.FunSuite -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.{PartitionPruningRDDPartition, RDD, PartitionPruningRDD} +import org.apache.spark.{TaskContext, Partition, SharedSparkContext} class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext { @@ -49,7 +48,7 @@ class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext { } - test("Pruned Partitions can be merged ") { + test("Pruned Partitions can be unioned ") { val rdd = new RDD[Int](sc, Nil) { override protected def getPartitions = { @@ -72,17 +71,11 @@ class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext { }) val merged = prunedRDD1 ++ prunedRDD2 - assert(merged.count() == 2) val take = merged.take(2) - assert(take.apply(0) == 4) - assert(take.apply(1) == 6) - - } - } class TestPartition(i: Int, value: Int) extends Partition with Serializable { @@ -90,4 +83,4 @@ class TestPartition(i: Int, value: Int) extends Partition with Serializable { def testValue = this.value -} \ No newline at end of file +} -- GitLab