diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 5a67073ef4e27eeaaefc262e1d0b21e37f193180..d5f54d6cbda5971c7c03c1fbd3b810b4ac6ada22 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -22,13 +22,11 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { * Represents a dependency on the output of a shuffle stage. * @param shuffleId the shuffle id * @param rdd the parent RDD - * @param aggregator optional aggregator; if provided, map-side combining will be performed * @param partitioner partitioner used to partition the shuffle output */ -class ShuffleDependency[K, V, C]( +class ShuffleDependency[K, V]( val shuffleId: Int, @transient rdd: RDD[(K, V)], - val aggregator: Option[Aggregator[K, V, C]], val partitioner: Partitioner) extends Dependency(rdd) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 36cfda9cdba7e5a4a4bc47047e98cba7fb02da25..9cb237804833b57f43360733fefe6ba7af3d8630 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -58,13 +58,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (mapSideCombine) { - val combiners = new ShuffledRDD[K, V, C](self, Some(aggregator), partitioner) - combiners.mapPartitions(aggregator.combineCombinersByKey(_), true) + val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true) + val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner) + partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true) } else { // Don't apply map-side combiner. // A sanity check to make sure mergeCombiners is not defined. assert(mergeCombiners == null) - val values = new ShuffledRDD[K, V, V](self, None, partitioner) + val values = new ShuffledRDD[K, V](self, partitioner) values.mapPartitions(aggregator.combineValuesByKey(_), true) } } @@ -175,7 +176,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( createCombiner _, mergeValue _, mergeCombiners _, partitioner) bufs.flatMapValues(buf => buf) } else { - new ShuffledRDD[K, V, V](self, None, partitioner) + new ShuffledRDD[K, V](self, partitioner) } } @@ -613,7 +614,7 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( */ def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = { val shuffled = - new ShuffledRDD[K, V, V](self, None, new RangePartitioner(numSplits, self, ascending)) + new ShuffledRDD[K, V](self, new RangePartitioner(numSplits, self, ascending)) shuffled.mapPartitions(iter => { val buf = iter.toArray if (ascending) { diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index cc92f1203c847e6d18414735f7129c84e880e6df..551085815cc155f08111b453eb916612f2033513 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -1,9 +1,5 @@ package spark.rdd -import java.net.URL -import java.io.EOFException -import java.io.ObjectInputStream - import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -43,13 +39,14 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) override val dependencies = { val deps = new ArrayBuffer[Dependency[_]] for ((rdd, index) <- rdds.zipWithIndex) { - if (rdd.partitioner == Some(part)) { - logInfo("Adding one-to-one dependency with " + rdd) - deps += new OneToOneDependency(rdd) + val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true) + if (mapSideCombinedRDD.partitioner == Some(part)) { + logInfo("Adding one-to-one dependency with " + mapSideCombinedRDD) + deps += new OneToOneDependency(mapSideCombinedRDD) } else { logInfo("Adding shuffle dependency with " + rdd) - deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]]( - context.newShuffleId, rdd, Some(aggr), part) + deps += new ShuffleDependency[Any, ArrayBuffer[Any]]( + context.newShuffleId, mapSideCombinedRDD, part) } } deps.toList @@ -62,7 +59,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) for (i <- 0 until array.size) { array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) => dependencies(j) match { - case s: ShuffleDependency[_, _, _] => + case s: ShuffleDependency[_, _] => new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep case _ => new NarrowCoGroupSplitDep(r, r.splits(i)): CoGroupSplitDep diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 8b1c29b065c1da3eac43cd8bd5d73d8ac7b31aeb..3a173ece1a8528558545334b34bc451e818ecc5e 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -1,6 +1,5 @@ package spark.rdd -import spark.Aggregator import spark.Partitioner import spark.RDD import spark.ShuffleDependency @@ -15,17 +14,13 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { /** * The resulting RDD from a shuffle (e.g. repartitioning of data). * @param parent the parent RDD. - * @param aggregator if provided, this aggregator will be used to perform map-side combining. * @param part the partitioner used to partition the RDD * @tparam K the key class. * @tparam V the value class. - * @tparam C if map side combiners are used, then this is the combiner type; otherwise, - * this is the same as V. */ -class ShuffledRDD[K, V, C]( +class ShuffledRDD[K, V]( @transient parent: RDD[(K, V)], - aggregator: Option[Aggregator[K, V, C]], - part: Partitioner) extends RDD[(K, C)](parent.context) { + part: Partitioner) extends RDD[(K, V)](parent.context) { override val partitioner = Some(part) @@ -36,10 +31,10 @@ class ShuffledRDD[K, V, C]( override def preferredLocations(split: Split) = Nil - val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part) + val dep = new ShuffleDependency(context.newShuffleId, parent, part) override val dependencies = List(dep) - override def compute(split: Split): Iterator[(K, C)] = { - SparkEnv.get.shuffleFetcher.fetch[K, C](dep.shuffleId, split.index) + override def compute(split: Split): Iterator[(K, V)] = { + SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index) } } \ No newline at end of file diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 6f4c6bffd7b2682ae1be829688e1e00e9ac9787f..aaaed59c4a0453cf406c5b0878285e5482134b19 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -104,7 +104,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * The priority value passed in will be used if the stage doesn't already exist with * a lower priority (we assume that priorities always increase across jobs for now). */ - def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_,_], priority: Int): Stage = { + def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => @@ -119,7 +119,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * as a result stage for the final RDD used directly in an action. The stage will also be given * the provided priority. */ - def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]], priority: Int): Stage = { + def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of splits is unknown logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") @@ -149,7 +149,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with cacheTracker.registerRDD(r.id, r.splits.size) for (dep <- r.dependencies) { dep match { - case shufDep: ShuffleDependency[_,_,_] => + case shufDep: ShuffleDependency[_,_] => parents += getShuffleMapStage(shufDep, priority) case _ => visit(dep.rdd) @@ -172,7 +172,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with if (locs(p) == Nil) { for (dep <- rdd.dependencies) { dep match { - case shufDep: ShuffleDependency[_,_,_] => + case shufDep: ShuffleDependency[_,_] => val mapStage = getShuffleMapStage(shufDep, stage.priority) if (!mapStage.isAvailable) { missing += mapStage @@ -549,7 +549,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with visitedRdds += rdd for (dep <- rdd.dependencies) { dep match { - case shufDep: ShuffleDependency[_,_,_] => + case shufDep: ShuffleDependency[_,_] => val mapStage = getShuffleMapStage(shufDep, stage.priority) if (!mapStage.isAvailable) { visitedStages += mapStage diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index c97be188449f096b56193d71949f0ffbea5ff17a..60105c42b601badbdae206bd42e7e2fa7386d0f0 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -22,7 +22,7 @@ private[spark] object ShuffleMapTask { // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new JHashMap[Int, Array[Byte]] - def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = { + def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = { synchronized { val old = serializedInfoCache.get(stageId) if (old != null) { @@ -41,14 +41,14 @@ private[spark] object ShuffleMapTask { } } - def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = { + def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = { synchronized { val loader = Thread.currentThread.getContextClassLoader val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) val ser = SparkEnv.get.closureSerializer.newInstance val objIn = ser.deserializeStream(in) val rdd = objIn.readObject().asInstanceOf[RDD[_]] - val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]] + val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]] return (rdd, dep) } } @@ -71,7 +71,7 @@ private[spark] object ShuffleMapTask { private[spark] class ShuffleMapTask( stageId: Int, var rdd: RDD[_], - var dep: ShuffleDependency[_,_,_], + var dep: ShuffleDependency[_,_], var partition: Int, @transient var locs: Seq[String]) extends Task[MapStatus](stageId) @@ -113,33 +113,14 @@ private[spark] class ShuffleMapTask( val numOutputSplits = dep.partitioner.numPartitions val partitioner = dep.partitioner - val bucketIterators = - if (dep.aggregator.isDefined) { - val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]] - // Apply combiners (map-side aggregation) to the map output. - val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any]) - for (elem <- rdd.iterator(split)) { - val (k, v) = elem.asInstanceOf[(Any, Any)] - val bucketId = partitioner.getPartition(k) - val bucket = buckets(bucketId) - val existing = bucket.get(k) - if (existing == null) { - bucket.put(k, aggregator.createCombiner(v)) - } else { - bucket.put(k, aggregator.mergeValue(existing, v)) - } - } - buckets.map(_.iterator) - } else { - // No combiners (no map-side aggregation). Simply partition the map output. - val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)]) - for (elem <- rdd.iterator(split)) { - val pair = elem.asInstanceOf[(Any, Any)] - val bucketId = partitioner.getPartition(pair._1) - buckets(bucketId) += pair - } - buckets.map(_.iterator) - } + // Partition the map output. + val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)]) + for (elem <- rdd.iterator(split)) { + val pair = elem.asInstanceOf[(Any, Any)] + val bucketId = partitioner.getPartition(pair._1) + buckets(bucketId) += pair + } + val bucketIterators = buckets.map(_.iterator) val compressedSizes = new Array[Byte](numOutputSplits) diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index 1149c00a239aacdcd384cad2bf94936f2e3517a4..4846b6672930367e3b5983ecb5dc7f5399fdf1bd 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -22,7 +22,7 @@ import spark.storage.BlockManagerId private[spark] class Stage( val id: Int, val rdd: RDD[_], - val shuffleDep: Option[ShuffleDependency[_,_,_]], // Output shuffle if stage is a map stage + val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage val parents: List[Stage], val priority: Int) extends Logging { diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 397eb759c01dfc4cbb565543388e5e168547a7db..8170100f1dca27798dcffc1a262b48d1ae706ad0 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -216,35 +216,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { // Test that a shuffle on the file works, because this used to be a bug assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) } - - test("map-side combine") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1), (1, 1)), 2) - - // Test with map-side combine on. - val sums = pairs.reduceByKey(_+_).collect() - assert(sums.toSet === Set((1, 8), (2, 1))) - - val aggregator = new Aggregator[Int, Int, Int]( - (v: Int) => v, - _+_, - _+_) - - // Turn off map-side combine and test the results. - var shuffledRdd : RDD[(Int, Int)] = - new ShuffledRDD[Int, Int, Int](pairs, None, new HashPartitioner(2)) - shuffledRdd = shuffledRdd.mapPartitions(aggregator.combineValuesByKey(_)) - assert(shuffledRdd.collect().toSet === Set((1,8), (2, 1))) - - // Run a wrong mergeCombine function with map-side combine on. - // We expect to see an exception thrown. - val aggregatorWithException = new Aggregator[Int, Int, Int]( - (v: Int) => v, _+_, ShuffleSuite.mergeCombineException) - var shuffledRdd2 : RDD[(Int, Int)] = - new ShuffledRDD[Int, Int, Int](pairs, Some(aggregatorWithException), new HashPartitioner(2)) - shuffledRdd2 = shuffledRdd2.mapPartitions(aggregatorWithException.combineCombinersByKey(_)) - evaluating { shuffledRdd2.collect() } should produce [SparkException] - } } object ShuffleSuite {