diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index b0daa70cfdf14f3f59ee45ade35b251a1472c5e5..df8ce9c054f75fb81fcebe994d28067f8284804b 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -1,17 +1,44 @@ package spark +import java.util.{HashMap => JHashMap} + +import scala.collection.JavaConversions._ + /** A set of functions used to aggregate data. * * @param createCombiner function to create the initial value of the aggregation. * @param mergeValue function to merge a new value into the aggregation result. * @param mergeCombiners function to merge outputs from multiple mergeValue function. - * @param mapSideCombine whether to apply combiners on map partitions, also - * known as map-side aggregations. When set to false, - * mergeCombiners function is not used. */ case class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, - val mergeCombiners: (C, C) => C, - val mapSideCombine: Boolean = true) + val mergeCombiners: (C, C) => C) { + + def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = { + val combiners = new JHashMap[K, C] + for ((k, v) <- iter) { + val oldC = combiners.get(k) + if (oldC == null) { + combiners.put(k, createCombiner(v)) + } else { + combiners.put(k, mergeValue(oldC, v)) + } + } + combiners.iterator + } + + def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { + val combiners = new JHashMap[K, C] + for ((k, c) <- iter) { + val oldC = combiners.get(k) + if (oldC == null) { + combiners.put(k, c) + } else { + combiners.put(k, mergeCombiners(oldC, c)) + } + } + combiners.iterator + } +} diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 4554db2249944382eb40ed60c8a5456149ef8290..86432d0127e3e9a75d24177ebc8558d453e0ecc8 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -1,18 +1,12 @@ package spark -import java.io.EOFException -import java.net.URL - import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import spark.storage.BlockException import spark.storage.BlockManagerId -import it.unimi.dsi.fastutil.io.FastBufferedInputStream - private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { - def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) { + override def fetch[K, V](shuffleId: Int, reduceId: Int) = { logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) val blockManager = SparkEnv.get.blockManager @@ -31,14 +25,12 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin (address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2))) } - for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) { + def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[(K, V)] = { + val blockId = blockPair._1 + val blockOption = blockPair._2 blockOption match { case Some(block) => { - val values = block - for(value <- values) { - val v = value.asInstanceOf[(K, V)] - func(v._1, v._2) - } + block.asInstanceOf[Iterator[(K, V)]] } case None => { val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r @@ -53,8 +45,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } } } - - logDebug("Fetching and merging outputs of shuffle %d, reduce %d took %d ms".format( - shuffleId, reduceId, System.currentTimeMillis - startTime)) + blockManager.getMultiple(blocksByAddress).flatMap(unpackBlock) } } diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index dfc7e292b737983aa6a5317e7c41ab5d06388626..b85d2732db91919ad708d94e4e3a73c85b9d5bbb 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -22,12 +22,10 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { * Represents a dependency on the output of a shuffle stage. * @param shuffleId the shuffle id * @param rdd the parent RDD - * @param aggregator optional aggregator; this allows for map-side combining * @param partitioner partitioner used to partition the shuffle output */ -class ShuffleDependency[K, V, C]( +class ShuffleDependency[K, V]( @transient rdd: RDD[(K, V)], - val aggregator: Option[Aggregator[K, V, C]], val partitioner: Partitioner) extends Dependency(rdd) { diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index d693b4e820cfd11f35cbca4fcb867166f0965c18..e5bb639cfd1ac2c9bd9be88bd9e31d80e91e3cd3 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -1,10 +1,6 @@ package spark -import java.io.EOFException -import java.io.ObjectInputStream -import java.net.URL import java.util.{Date, HashMap => JHashMap} -import java.util.concurrent.atomic.AtomicLong import java.text.SimpleDateFormat import scala.collection.Map @@ -14,18 +10,11 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.BytesWritable -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.io.Text -import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.FileOutputCommitter import org.apache.hadoop.mapred.FileOutputFormat import org.apache.hadoop.mapred.HadoopWriter import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.OutputCommitter import org.apache.hadoop.mapred.OutputFormat -import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext} @@ -64,15 +53,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( partitioner: Partitioner, mapSideCombine: Boolean = true): RDD[(K, C)] = { val aggregator = - if (mapSideCombine) { - new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) - } else { - // Don't apply map-side combiner. - // A sanity check to make sure mergeCombiners is not defined. - assert(mergeCombiners == null) - new Aggregator[K, V, C](createCombiner, mergeValue, null, false) - } - new ShuffledAggregatedRDD(self, aggregator, partitioner) + new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) + if (mapSideCombine) { + val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true) + val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner) + partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true) + } else { + // Don't apply map-side combiner. + // A sanity check to make sure mergeCombiners is not defined. + assert(mergeCombiners == null) + val values = new ShuffledRDD[K, V](self, partitioner) + values.mapPartitions(aggregator.combineValuesByKey(_), true) + } } /** @@ -181,7 +173,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( createCombiner _, mergeValue _, mergeCombiners _, partitioner) bufs.flatMapValues(buf => buf) } else { - new RepartitionShuffledRDD(self, partitioner) + new ShuffledRDD[K, V](self, partitioner) } } @@ -618,7 +610,16 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( * order of the keys). */ def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = { - new ShuffledSortedRDD(self, ascending, numSplits) + val shuffled = + new ShuffledRDD[K, V](self, new RangePartitioner(numSplits, self, ascending)) + shuffled.mapPartitions(iter => { + val buf = iter.toArray + if (ascending) { + buf.sortWith((x, y) => x._1 < y._1).iterator + } else { + buf.sortWith((x, y) => x._1 > y._1).iterator + } + }, true) } } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index ddb420effff76bf0e115c7268849d3c7240c4e62..338dff40618cc8469f693e092439f88e7913e8b2 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -282,8 +282,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] = - new MapPartitionsRDD(this, sc.clean(f)) + def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = + new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning) /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala index daa35fe7f2c70c41aa019d3bc89e6ad7b0413d71..d9a94d4021ee325d57aac710f4b0858883f3e63b 100644 --- a/core/src/main/scala/spark/ShuffleFetcher.scala +++ b/core/src/main/scala/spark/ShuffleFetcher.scala @@ -1,10 +1,12 @@ package spark private[spark] abstract class ShuffleFetcher { - // Fetch the shuffle outputs for a given ShuffleDependency, calling func exactly - // once on each key-value pair obtained. - def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) + /** + * Fetch the shuffle outputs for a given ShuffleDependency. + * @return An iterator over the elements of the fetched shuffle outputs. + */ + def fetch[K, V](shuffleId: Int, reduceId: Int) : Iterator[(K, V)] - // Stop the fetcher + /** Stop the fetcher */ def stop() {} } diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index ace25006270239799ae8ce823887e04df8d8c58b..50bec9e63b48310a836c16f2c5cad6865331baa3 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -1,9 +1,5 @@ package spark.rdd -import java.net.URL -import java.io.EOFException -import java.io.ObjectInputStream - import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -43,12 +39,13 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) override val dependencies = { val deps = new ArrayBuffer[Dependency[_]] for ((rdd, index) <- rdds.zipWithIndex) { - if (rdd.partitioner == Some(part)) { - logInfo("Adding one-to-one dependency with " + rdd) - deps += new OneToOneDependency(rdd) + val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true) + if (mapSideCombinedRDD.partitioner == Some(part)) { + logInfo("Adding one-to-one dependency with " + mapSideCombinedRDD) + deps += new OneToOneDependency(mapSideCombinedRDD) } else { logInfo("Adding shuffle dependency with " + rdd) - deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]](rdd, Some(aggr), part) + deps += new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part) } } deps.toList @@ -61,7 +58,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) for (i <- 0 until array.size) { array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) => dependencies(j) match { - case s: ShuffleDependency[_, _, _] => + case s: ShuffleDependency[_, _] => new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep case _ => new NarrowCoGroupSplitDep(r, r.splits(i)): CoGroupSplitDep @@ -93,13 +90,13 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) } case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle - def mergePair(k: K, vs: Seq[Any]) { - val mySeq = getSeq(k) - for (v <- vs) + def mergePair(pair: (K, Seq[Any])) { + val mySeq = getSeq(pair._1) + for (v <- pair._2) mySeq(depNum) += v } val fetcher = SparkEnv.get.shuffleFetcher - fetcher.fetch[K, Seq[Any]](shuffleId, split.index, mergePair) + fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair) } } map.iterator diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala index b2c7a1cb9e4d1a89401098b493b7f281e97644c8..a904ef62c3f397bccc81bea6b876840b080dab25 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala @@ -7,8 +7,11 @@ import spark.Split private[spark] class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], - f: Iterator[T] => Iterator[U]) + f: Iterator[T] => Iterator[U], + preservesPartitioning: Boolean = false) extends RDD[U](prev.context) { + + override val partitioner = if (preservesPartitioning) prev.partitioner else None override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index be120acc71565ffdc740c369c501a228d9aa8e6d..145e419c53b508e4a76a8eb53595ea55b77075de 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -1,11 +1,6 @@ package spark.rdd -import scala.collection.mutable.ArrayBuffer -import java.util.{HashMap => JHashMap} - -import spark.Aggregator import spark.Partitioner -import spark.RangePartitioner import spark.RDD import spark.ShuffleDependency import spark.SparkEnv @@ -16,15 +11,16 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { override def hashCode(): Int = idx } - /** * The resulting RDD from a shuffle (e.g. repartitioning of data). + * @param parent the parent RDD. + * @param part the partitioner used to partition the RDD + * @tparam K the key class. + * @tparam V the value class. */ -abstract class ShuffledRDD[K, V, C]( +class ShuffledRDD[K, V]( @transient parent: RDD[(K, V)], - aggregator: Option[Aggregator[K, V, C]], - part: Partitioner) - extends RDD[(K, C)](parent.context) { + part: Partitioner) extends RDD[(K, V)](parent.context) { override val partitioner = Some(part) @@ -35,108 +31,10 @@ abstract class ShuffledRDD[K, V, C]( override def preferredLocations(split: Split) = Nil - val dep = new ShuffleDependency(parent, aggregator, part) + val dep = new ShuffleDependency(parent, part) override val dependencies = List(dep) -} - - -/** - * Repartition a key-value pair RDD. - */ -class RepartitionShuffledRDD[K, V]( - @transient parent: RDD[(K, V)], - part: Partitioner) - extends ShuffledRDD[K, V, V]( - parent, - None, - part) { override def compute(split: Split): Iterator[(K, V)] = { - val buf = new ArrayBuffer[(K, V)] - val fetcher = SparkEnv.get.shuffleFetcher - def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) } - fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer) - buf.iterator - } -} - - -/** - * A sort-based shuffle (that doesn't apply aggregation). It does so by first - * repartitioning the RDD by range, and then sort within each range. - */ -class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V]( - @transient parent: RDD[(K, V)], - ascending: Boolean, - numSplits: Int) - extends RepartitionShuffledRDD[K, V]( - parent, - new RangePartitioner(numSplits, parent, ascending)) { - - override def compute(split: Split): Iterator[(K, V)] = { - // By separating this from RepartitionShuffledRDD, we avoided a - // buf.iterator.toArray call, thus avoiding building up the buffer twice. - val buf = new ArrayBuffer[(K, V)] - def addTupleToBuffer(k: K, v: V) { buf += ((k, v)) } - SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer) - if (ascending) { - buf.sortWith((x, y) => x._1 < y._1).iterator - } else { - buf.sortWith((x, y) => x._1 > y._1).iterator - } - } -} - - -/** - * The resulting RDD from shuffle and running (hash-based) aggregation. - */ -class ShuffledAggregatedRDD[K, V, C]( - @transient parent: RDD[(K, V)], - aggregator: Aggregator[K, V, C], - part : Partitioner) - extends ShuffledRDD[K, V, C](parent, Some(aggregator), part) { - - override def compute(split: Split): Iterator[(K, C)] = { - val combiners = new JHashMap[K, C] - val fetcher = SparkEnv.get.shuffleFetcher - - if (aggregator.mapSideCombine) { - // Apply combiners on map partitions. In this case, post-shuffle we get a - // list of outputs from the combiners and merge them using mergeCombiners. - def mergePairWithMapSideCombiners(k: K, c: C) { - val oldC = combiners.get(k) - if (oldC == null) { - combiners.put(k, c) - } else { - combiners.put(k, aggregator.mergeCombiners(oldC, c)) - } - } - fetcher.fetch[K, C](dep.shuffleId, split.index, mergePairWithMapSideCombiners) - } else { - // Do not apply combiners on map partitions (i.e. map side aggregation is - // turned off). Post-shuffle we get a list of values and we use mergeValue - // to merge them. - def mergePairWithoutMapSideCombiners(k: K, v: V) { - val oldC = combiners.get(k) - if (oldC == null) { - combiners.put(k, aggregator.createCombiner(v)) - } else { - combiners.put(k, aggregator.mergeValue(oldC, v)) - } - } - fetcher.fetch[K, V](dep.shuffleId, split.index, mergePairWithoutMapSideCombiners) - } - - return new Iterator[(K, C)] { - var iter = combiners.entrySet().iterator() - - def hasNext: Boolean = iter.hasNext() - - def next(): (K, C) = { - val entry = iter.next() - (entry.getKey, entry.getValue) - } - } + SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index) } } diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 6f4c6bffd7b2682ae1be829688e1e00e9ac9787f..aaaed59c4a0453cf406c5b0878285e5482134b19 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -104,7 +104,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * The priority value passed in will be used if the stage doesn't already exist with * a lower priority (we assume that priorities always increase across jobs for now). */ - def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_,_], priority: Int): Stage = { + def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => @@ -119,7 +119,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with * as a result stage for the final RDD used directly in an action. The stage will also be given * the provided priority. */ - def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]], priority: Int): Stage = { + def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of splits is unknown logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") @@ -149,7 +149,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with cacheTracker.registerRDD(r.id, r.splits.size) for (dep <- r.dependencies) { dep match { - case shufDep: ShuffleDependency[_,_,_] => + case shufDep: ShuffleDependency[_,_] => parents += getShuffleMapStage(shufDep, priority) case _ => visit(dep.rdd) @@ -172,7 +172,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with if (locs(p) == Nil) { for (dep <- rdd.dependencies) { dep match { - case shufDep: ShuffleDependency[_,_,_] => + case shufDep: ShuffleDependency[_,_] => val mapStage = getShuffleMapStage(shufDep, stage.priority) if (!mapStage.isAvailable) { missing += mapStage @@ -549,7 +549,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with visitedRdds += rdd for (dep <- rdd.dependencies) { dep match { - case shufDep: ShuffleDependency[_,_,_] => + case shufDep: ShuffleDependency[_,_] => val mapStage = getShuffleMapStage(shufDep, stage.priority) if (!mapStage.isAvailable) { visitedStages += mapStage diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 86796d3677fc023f639b33d9021d11e0ef9ee8a6..60105c42b601badbdae206bd42e7e2fa7386d0f0 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -22,7 +22,7 @@ private[spark] object ShuffleMapTask { // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new JHashMap[Int, Array[Byte]] - def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = { + def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = { synchronized { val old = serializedInfoCache.get(stageId) if (old != null) { @@ -41,14 +41,14 @@ private[spark] object ShuffleMapTask { } } - def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = { + def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = { synchronized { val loader = Thread.currentThread.getContextClassLoader val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) val ser = SparkEnv.get.closureSerializer.newInstance val objIn = ser.deserializeStream(in) val rdd = objIn.readObject().asInstanceOf[RDD[_]] - val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]] + val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]] return (rdd, dep) } } @@ -71,7 +71,7 @@ private[spark] object ShuffleMapTask { private[spark] class ShuffleMapTask( stageId: Int, var rdd: RDD[_], - var dep: ShuffleDependency[_,_,_], + var dep: ShuffleDependency[_,_], var partition: Int, @transient var locs: Seq[String]) extends Task[MapStatus](stageId) @@ -113,33 +113,14 @@ private[spark] class ShuffleMapTask( val numOutputSplits = dep.partitioner.numPartitions val partitioner = dep.partitioner - val bucketIterators = - if (dep.aggregator.isDefined && dep.aggregator.get.mapSideCombine) { - val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]] - // Apply combiners (map-side aggregation) to the map output. - val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any]) - for (elem <- rdd.iterator(split)) { - val (k, v) = elem.asInstanceOf[(Any, Any)] - val bucketId = partitioner.getPartition(k) - val bucket = buckets(bucketId) - val existing = bucket.get(k) - if (existing == null) { - bucket.put(k, aggregator.createCombiner(v)) - } else { - bucket.put(k, aggregator.mergeValue(existing, v)) - } - } - buckets.map(_.iterator) - } else { - // No combiners (no map-side aggregation). Simply partition the map output. - val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)]) - for (elem <- rdd.iterator(split)) { - val pair = elem.asInstanceOf[(Any, Any)] - val bucketId = partitioner.getPartition(pair._1) - buckets(bucketId) += pair - } - buckets.map(_.iterator) - } + // Partition the map output. + val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)]) + for (elem <- rdd.iterator(split)) { + val pair = elem.asInstanceOf[(Any, Any)] + val bucketId = partitioner.getPartition(pair._1) + buckets(bucketId) += pair + } + val bucketIterators = buckets.map(_.iterator) val compressedSizes = new Array[Byte](numOutputSplits) diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index 1149c00a239aacdcd384cad2bf94936f2e3517a4..4846b6672930367e3b5983ecb5dc7f5399fdf1bd 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -22,7 +22,7 @@ import spark.storage.BlockManagerId private[spark] class Stage( val id: Int, val rdd: RDD[_], - val shuffleDep: Option[ShuffleDependency[_,_,_]], // Output shuffle if stage is a map stage + val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage val parents: List[Stage], val priority: Int) extends Logging { diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 7f8ec5d48fa664bbbf31e7dfa943d5c1461db01f..8170100f1dca27798dcffc1a262b48d1ae706ad0 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -12,8 +12,8 @@ import org.scalacheck.Prop._ import com.google.common.io.Files -import spark.rdd.ShuffledAggregatedRDD -import SparkContext._ +import spark.rdd.ShuffledRDD +import spark.SparkContext._ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { @@ -216,41 +216,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { // Test that a shuffle on the file works, because this used to be a bug assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) } - - test("map-side combine") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1), (1, 1)), 2) - - // Test with map-side combine on. - val sums = pairs.reduceByKey(_+_).collect() - assert(sums.toSet === Set((1, 8), (2, 1))) - - // Turn off map-side combine and test the results. - val aggregator = new Aggregator[Int, Int, Int]( - (v: Int) => v, - _+_, - _+_, - false) - val shuffledRdd = new ShuffledAggregatedRDD( - pairs, aggregator, new HashPartitioner(2)) - assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1))) - - // Turn map-side combine off and pass a wrong mergeCombine function. Should - // not see an exception because mergeCombine should not have been called. - val aggregatorWithException = new Aggregator[Int, Int, Int]( - (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false) - val shuffledRdd1 = new ShuffledAggregatedRDD( - pairs, aggregatorWithException, new HashPartitioner(2)) - assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1))) - - // Now run the same mergeCombine function with map-side combine on. We - // expect to see an exception thrown. - val aggregatorWithException1 = new Aggregator[Int, Int, Int]( - (v: Int) => v, _+_, ShuffleSuite.mergeCombineException) - val shuffledRdd2 = new ShuffledAggregatedRDD( - pairs, aggregatorWithException1, new HashPartitioner(2)) - evaluating { shuffledRdd2.collect() } should produce [SparkException] - } } object ShuffleSuite {