From f442e7d83c93c894215427f5ef86c96d61160e0e Mon Sep 17 00:00:00 2001 From: Stephen Haberman <stephen@exigencecorp.com> Date: Sun, 24 Feb 2013 00:27:14 -0600 Subject: [PATCH] Update for split->partition rename. --- core/src/main/scala/spark/RDD.scala | 8 +++---- .../main/scala/spark/rdd/SubtractedRDD.scala | 24 +++++++++---------- core/src/test/scala/spark/ShuffleSuite.scala | 2 +- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 27a4d2d287..9e8eaee756 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -397,17 +397,17 @@ abstract class RDD[T: ClassManifest]( /** * Return an RDD with the elements from `this` that are not in `other`. * - * Uses `this` partitioner/split size, because even if `other` is huge, the resulting + * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ def subtract(other: RDD[T]): RDD[T] = - subtract(other, partitioner.getOrElse(new HashPartitioner(splits.size))) + subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size))) /** * Return an RDD with the elements from `this` that are not in `other`. */ - def subtract(other: RDD[T], numSplits: Int): RDD[T] = - subtract(other, new HashPartitioner(numSplits)) + def subtract(other: RDD[T], numPartitions: Int): RDD[T] = + subtract(other, new HashPartitioner(numPartitions)) /** * Return an RDD with the elements from `this` that are not in `other`. diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala index 244874e4e0..daf9cc993c 100644 --- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -6,7 +6,7 @@ import spark.RDD import spark.Partitioner import spark.Dependency import spark.TaskContext -import spark.Split +import spark.Partition import spark.SparkEnv import spark.ShuffleDependency import spark.OneToOneDependency @@ -24,7 +24,7 @@ import spark.OneToOneDependency * touch each once to decide if the value needs to be removed. * * This is particularly helpful when `rdd1` is much smaller than `rdd2`, as - * you can use `rdd1`'s partitioner/split size and not worry about running + * you can use `rdd1`'s partitioner/partition size and not worry about running * out of memory because of the size of `rdd2`. */ private[spark] class SubtractedRDD[T: ClassManifest]( @@ -63,16 +63,16 @@ private[spark] class SubtractedRDD[T: ClassManifest]( } } - override def getSplits: Array[Split] = { - val array = new Array[Split](part.numPartitions) + override def getPartitions: Array[Partition] = { + val array = new Array[Partition](part.numPartitions) for (i <- 0 until array.size) { - // Each CoGroupSplit will dependend on rdd1 and rdd2 - array(i) = new CoGroupSplit(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) => + // Each CoGroupPartition will depend on rdd1 and rdd2 + array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) => dependencies(j) match { case s: ShuffleDependency[_, _] => new ShuffleCoGroupSplitDep(s.shuffleId) case _ => - new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i)) + new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)) } }.toList) } @@ -81,21 +81,21 @@ private[spark] class SubtractedRDD[T: ClassManifest]( override val partitioner = Some(part) - override def compute(s: Split, context: TaskContext): Iterator[T] = { - val split = s.asInstanceOf[CoGroupSplit] + override def compute(p: Partition, context: TaskContext): Iterator[T] = { + val partition = p.asInstanceOf[CoGroupPartition] val set = new JHashSet[T] def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match { case NarrowCoGroupSplitDep(rdd, _, itsSplit) => for (k <- rdd.iterator(itsSplit, context)) op(k.asInstanceOf[T]) case ShuffleCoGroupSplitDep(shuffleId) => - for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, split.index)) + for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index)) op(k.asInstanceOf[T]) } // the first dep is rdd1; add all keys to the set - integrate(split.deps(0), set.add) + integrate(partition.deps(0), set.add) // the second dep is rdd2; remove all of its keys from the set - integrate(split.deps(1), set.remove) + integrate(partition.deps(1), set.remove) set.iterator } diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 05fb280d0a..77e0eab829 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -241,7 +241,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { val b = sc.parallelize(Array(2, 3, 4), 4) val c = a.subtract(b) assert(c.collect().toSet === Set(1)) - assert(c.splits.size === a.splits.size) + assert(c.partitions.size === a.partitions.size) } test("subtract with narrow dependency") { -- GitLab