Skip to content
Snippets Groups Projects
Commit f639b65e authored by Matthew Taylor's avatar Matthew Taylor
Browse files

PartitionPruningRDD is using index from parent(review changes)

parent 13b9bf49
No related branches found
No related tags found
No related merge requests found
...@@ -33,8 +33,8 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo ...@@ -33,8 +33,8 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
extends NarrowDependency[T](rdd) { extends NarrowDependency[T](rdd) {
@transient @transient
val partitions: Array[Partition] = rdd.partitions.zipWithIndex val partitions: Array[Partition] = rdd.partitions
.filter(s => partitionFilterFunc(s._2)).map(_._1).zipWithIndex .filter(s => partitionFilterFunc(s.index)).zipWithIndex
.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition } .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
override def getParents(partitionId: Int) = { override def getParents(partitionId: Int) = {
......
...@@ -15,11 +15,10 @@ ...@@ -15,11 +15,10 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark package org.apache.spark.rdd
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.apache.spark.SparkContext._ import org.apache.spark.{TaskContext, Partition, SharedSparkContext}
import org.apache.spark.rdd.{PartitionPruningRDDPartition, RDD, PartitionPruningRDD}
class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext { class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
...@@ -49,7 +48,7 @@ class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext { ...@@ -49,7 +48,7 @@ class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
} }
test("Pruned Partitions can be merged ") { test("Pruned Partitions can be unioned ") {
val rdd = new RDD[Int](sc, Nil) { val rdd = new RDD[Int](sc, Nil) {
override protected def getPartitions = { override protected def getPartitions = {
...@@ -72,17 +71,11 @@ class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext { ...@@ -72,17 +71,11 @@ class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
}) })
val merged = prunedRDD1 ++ prunedRDD2 val merged = prunedRDD1 ++ prunedRDD2
assert(merged.count() == 2) assert(merged.count() == 2)
val take = merged.take(2) val take = merged.take(2)
assert(take.apply(0) == 4) assert(take.apply(0) == 4)
assert(take.apply(1) == 6) assert(take.apply(1) == 6)
} }
} }
class TestPartition(i: Int, value: Int) extends Partition with Serializable { class TestPartition(i: Int, value: Int) extends Partition with Serializable {
...@@ -90,4 +83,4 @@ class TestPartition(i: Int, value: Int) extends Partition with Serializable { ...@@ -90,4 +83,4 @@ class TestPartition(i: Int, value: Int) extends Partition with Serializable {
def testValue = this.value def testValue = this.value
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment