diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 3d93ff33bbf7be0f3244731642c1215a65cc5aed..e0d2eabb1dd65e628bb9a8cebb06016bd32a013b 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -40,7 +40,6 @@ import spark.rdd.MapPartitionsRDD import spark.rdd.MapPartitionsWithSplitRDD import spark.rdd.PipedRDD import spark.rdd.SampledRDD -import spark.rdd.SplitsPruningRDD import spark.rdd.UnionRDD import spark.rdd.ZippedRDD import spark.storage.StorageLevel @@ -544,15 +543,6 @@ abstract class RDD[T: ClassManifest]( map(x => (f(x), x)) } - /** - * Prune splits (partitions) so Spark can avoid launching tasks on - * all splits. An example use case: If we know the RDD is partitioned by range, - * and the execution DAG has a filter on the key, we can avoid launching tasks - * on splits that don't have the range covering the key. - */ - def pruneSplits(splitsFilterFunc: Int => Boolean): RDD[T] = - new SplitsPruningRDD(this, splitsFilterFunc) - /** A private method for tests, to look at the contents of each partition */ private[spark] def collectPartitions(): Array[Array[T]] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray) diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala new file mode 100644 index 0000000000000000000000000000000000000000..3048949ef2f87fffa227a96e0092ae699f621244 --- /dev/null +++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala @@ -0,0 +1,24 @@ +package spark.rdd + +import spark.{PruneDependency, RDD, SparkEnv, Split, TaskContext} + +/** + * A RDD used to prune RDD partitions/splits so we can avoid launching tasks on + * all partitions. An example use case: If we know the RDD is partitioned by range, + * and the execution DAG has a filter on the key, we can avoid launching tasks + * on partitions that don't have the range covering the key. + */ +class PartitionPruningRDD[T: ClassManifest]( + @transient prev: RDD[T], + @transient partitionFilterFunc: Int => Boolean) + extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) { + + @transient + val partitions_ : Array[Split] = dependencies_.head.asInstanceOf[PruneDependency[T]].splits + + override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context) + + override protected def getSplits = partitions_ + + override val partitioner = firstParent[T].partitioner +} diff --git a/core/src/main/scala/spark/rdd/SplitsPruningRDD.scala b/core/src/main/scala/spark/rdd/SplitsPruningRDD.scala deleted file mode 100644 index 9b1a210ba3dc05fcbca0b590e60c3b7213259e0e..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/rdd/SplitsPruningRDD.scala +++ /dev/null @@ -1,24 +0,0 @@ -package spark.rdd - -import spark.{PruneDependency, RDD, SparkEnv, Split, TaskContext} - -/** - * A RDD used to prune RDD splits so we can avoid launching tasks on - * all splits. An example use case: If we know the RDD is partitioned by range, - * and the execution DAG has a filter on the key, we can avoid launching tasks - * on splits that don't have the range covering the key. - */ -class SplitsPruningRDD[T: ClassManifest]( - @transient prev: RDD[T], - @transient splitsFilterFunc: Int => Boolean) - extends RDD[T](prev.context, List(new PruneDependency(prev, splitsFilterFunc))) { - - @transient - val _splits: Array[Split] = dependencies_.head.asInstanceOf[PruneDependency[T]].splits - - override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context) - - override protected def getSplits = _splits - - override val partitioner = firstParent[T].partitioner -} diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 5a3a12dfff8e34c2cd7bbad1213e1905e22d129f..73846131a9dfcaf64c6e42674fb9537d645cb89c 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -3,7 +3,7 @@ package spark import scala.collection.mutable.HashMap import org.scalatest.{BeforeAndAfter, FunSuite} import spark.SparkContext._ -import spark.rdd.CoalescedRDD +import spark.rdd.{CoalescedRDD, PartitionPruningRDD} class RDDSuite extends FunSuite with BeforeAndAfter { @@ -169,11 +169,11 @@ class RDDSuite extends FunSuite with BeforeAndAfter { } } - test("split pruning") { + test("partition pruning") { sc = new SparkContext("local", "test") val data = sc.parallelize(1 to 10, 10) // Note that split number starts from 0, so > 8 means only 10th partition left. - val prunedRdd = data.pruneSplits(splitNum => splitNum > 8) + val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8) assert(prunedRdd.splits.size === 1) val prunedData = prunedRdd.collect assert(prunedData.size === 1)