diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 231e23a7deacc222bd8f0cc2cc2c5c61e5882fa0..cc3cca25713592fa009a0d70ac94b35118b9a422 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -465,7 +465,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( val res = self.context.runJob(self, process _, Array(index), false) res(0) case None => - self.filter(_._1 == key).map(_._2).collect + self.filter(_._1 == key).map(_._2).collect() } } @@ -590,7 +590,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( var count = 0 while(iter.hasNext) { - val record = iter.next + val record = iter.next() count += 1 writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 010e61dfdcb2515e5f64f691fb8b14a877700499..9d6ea782bd83c000f5c7d666018a2f2587e3759d 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -389,16 +389,18 @@ abstract class RDD[T: ClassManifest]( None } } - val options = sc.runJob(this, reducePartition) - val results = new ArrayBuffer[T] - for (opt <- options; elem <- opt) { - results += elem - } - if (results.size == 0) { - throw new UnsupportedOperationException("empty collection") - } else { - return results.reduceLeft(cleanF) + var jobResult: Option[T] = None + val mergeResult = (index: Int, taskResult: Option[T]) => { + if (taskResult != None) { + jobResult = jobResult match { + case Some(value) => Some(f(value, taskResult.get)) + case None => taskResult + } + } } + sc.runJob(this, reducePartition, mergeResult) + // Get the final result out of our Option, or throw an exception if the RDD was empty + jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } /** @@ -408,9 +410,13 @@ abstract class RDD[T: ClassManifest]( * modify t2. */ def fold(zeroValue: T)(op: (T, T) => T): T = { + // Clone the zero value since we will also be serializing it as part of tasks + var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanOp = sc.clean(op) - val results = sc.runJob(this, (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)) - return results.fold(zeroValue)(cleanOp) + val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp) + val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult) + sc.runJob(this, foldPartition, mergeResult) + jobResult } /** @@ -422,11 +428,14 @@ abstract class RDD[T: ClassManifest]( * allocation. */ def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { + // Clone the zero value since we will also be serializing it as part of tasks + var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) - val results = sc.runJob(this, - (iter: Iterator[T]) => iter.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)) - return results.fold(zeroValue)(cleanCombOp) + val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) + val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) + sc.runJob(this, aggregatePartition, mergeResult) + jobResult } /** @@ -437,7 +446,7 @@ abstract class RDD[T: ClassManifest]( var result = 0L while (iter.hasNext) { result += 1L - iter.next + iter.next() } result }).sum @@ -452,7 +461,7 @@ abstract class RDD[T: ClassManifest]( var result = 0L while (iter.hasNext) { result += 1L - iter.next + iter.next() } result } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index b0d4b582402222bc1290550e47a8acc01a6db9cb..ddbf8f95d9390bc37654328ccf07950eff6abb3f 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -543,26 +543,42 @@ class SparkContext( } /** - * Run a function on a given set of partitions in an RDD and return the results. This is the main - * entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies - * whether the scheduler can run the computation on the driver rather than shipping it out to the - * cluster, for short actions like first(). + * Run a function on a given set of partitions in an RDD and pass the results to the given + * handler function. This is the main entry point for all actions in Spark. The allowLocal + * flag specifies whether the scheduler can run the computation on the driver rather than + * shipping it out to the cluster, for short actions like first(). */ def runJob[T, U: ClassManifest]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], - allowLocal: Boolean - ): Array[U] = { + allowLocal: Boolean, + resultHandler: (Int, U) => Unit) { val callSite = Utils.getSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal) + val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result } + /** + * Run a function on a given set of partitions in an RDD and return the results as an array. The + * allowLocal flag specifies whether the scheduler can run the computation on the driver rather + * than shipping it out to the cluster, for short actions like first(). + */ + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + allowLocal: Boolean + ): Array[U] = { + val results = new Array[U](partitions.size) + runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res) + results + } + /** * Run a job on a given set of partitions of an RDD, but take a function of type * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. @@ -590,6 +606,29 @@ class SparkContext( runJob(rdd, func, 0 until rdd.splits.size, false) } + /** + * Run a job on all partitions in an RDD and pass the results to a handler function. + */ + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + processPartition: (TaskContext, Iterator[T]) => U, + resultHandler: (Int, U) => Unit) + { + runJob[T, U](rdd, processPartition, 0 until rdd.splits.size, false, resultHandler) + } + + /** + * Run a job on all partitions in an RDD and pass the results to a handler function. + */ + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + processPartition: Iterator[T] => U, + resultHandler: (Int, U) => Unit) + { + val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter) + runJob[T, U](rdd, processFunc, 0 until rdd.splits.size, false, resultHandler) + } + /** * Run a job that can return approximate results. */ diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 1e58d012731b5888e789bf59d2b3eef1e5ee566f..28d643abca8f42686f7174cff3aad0d6dd338285 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -12,6 +12,7 @@ import scala.io.Source import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder import scala.Some +import spark.serializer.SerializerInstance /** * Various utility methods used by Spark. @@ -446,4 +447,11 @@ private object Utils extends Logging { socket.close() portBound } + + /** + * Clone an object using a Spark serializer. + */ + def clone[T](value: T, serializer: SerializerInstance): T = { + serializer.deserialize[T](serializer.serialize(value)) + } } diff --git a/core/src/main/scala/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/spark/partial/ApproximateActionListener.scala index 42f46e06ed0f4dabe06e149de758965d029e5d83..24b4909380c66fb6f49dee5721e60229ebd8ba76 100644 --- a/core/src/main/scala/spark/partial/ApproximateActionListener.scala +++ b/core/src/main/scala/spark/partial/ApproximateActionListener.scala @@ -32,7 +32,7 @@ private[spark] class ApproximateActionListener[T, U, R]( if (finishedTasks == totalTasks) { // If we had already returned a PartialResult, set its final value resultObject.foreach(r => r.setFinalValue(evaluator.currentResult())) - // Notify any waiting thread that may have called getResult + // Notify any waiting thread that may have called awaitResult this.notifyAll() } } @@ -49,7 +49,7 @@ private[spark] class ApproximateActionListener[T, U, R]( * Waits for up to timeout milliseconds since the listener was created and then returns a * PartialResult with the result so far. This may be complete if the whole job is done. */ - def getResult(): PartialResult[R] = synchronized { + def awaitResult(): PartialResult[R] = synchronized { val finishTime = startTime + timeout while (true) { val time = System.currentTimeMillis() diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 14f61f7e87bc8c399e215443cdc969dcf37b6e1f..908a22b2dfa0e39cc1aedc622142601146a93481 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -203,18 +203,17 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: String, - allowLocal: Boolean) - : Array[U] = + allowLocal: Boolean, + resultHandler: (Int, U) => Unit) { if (partitions.size == 0) { - return new Array[U](0) + return } - val waiter = new JobWaiter(partitions.size) + val waiter = new JobWaiter(partitions.size, resultHandler) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter)) - waiter.getResult() match { - case JobSucceeded(results: Seq[_]) => - return results.asInstanceOf[Seq[U]].toArray + waiter.awaitResult() match { + case JobSucceeded => {} case JobFailed(exception: Exception) => logInfo("Failed to run " + callSite) throw exception @@ -233,7 +232,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.splits.size).toArray eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener)) - return listener.getResult() // Will throw an exception if the job fails + return listener.awaitResult() // Will throw an exception if the job fails } /** diff --git a/core/src/main/scala/spark/scheduler/JobResult.scala b/core/src/main/scala/spark/scheduler/JobResult.scala index c4a74e526f0035986fb220ba1e5c598978c821bb..654131ee8417b209f735074856da271b04414233 100644 --- a/core/src/main/scala/spark/scheduler/JobResult.scala +++ b/core/src/main/scala/spark/scheduler/JobResult.scala @@ -5,5 +5,5 @@ package spark.scheduler */ private[spark] sealed trait JobResult -private[spark] case class JobSucceeded(results: Seq[_]) extends JobResult +private[spark] case object JobSucceeded extends JobResult private[spark] case class JobFailed(exception: Exception) extends JobResult diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala index b3d4feebe5e621cde40260c06a9883ea42fa9958..3cc6a86345fd195828b999f7b33b7c9c31e7d111 100644 --- a/core/src/main/scala/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/spark/scheduler/JobWaiter.scala @@ -3,10 +3,12 @@ package spark.scheduler import scala.collection.mutable.ArrayBuffer /** - * An object that waits for a DAGScheduler job to complete. + * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their + * results to the given handler function. */ -private[spark] class JobWaiter(totalTasks: Int) extends JobListener { - private val taskResults = ArrayBuffer.fill[Any](totalTasks)(null) +private[spark] class JobWaiter[T](totalTasks: Int, resultHandler: (Int, T) => Unit) + extends JobListener { + private var finishedTasks = 0 private var jobFinished = false // Is the job as a whole finished (succeeded or failed)? @@ -17,11 +19,11 @@ private[spark] class JobWaiter(totalTasks: Int) extends JobListener { if (jobFinished) { throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") } - taskResults(index) = result + resultHandler(index, result.asInstanceOf[T]) finishedTasks += 1 if (finishedTasks == totalTasks) { jobFinished = true - jobResult = JobSucceeded(taskResults) + jobResult = JobSucceeded this.notifyAll() } } @@ -38,7 +40,7 @@ private[spark] class JobWaiter(totalTasks: Int) extends JobListener { } } - def getResult(): JobResult = synchronized { + def awaitResult(): JobResult = synchronized { while (!jobFinished) { this.wait() } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index ed03e651537fdc5ff546566dd7a3de0ddf121470..95d2e627306f6a83fc10b406b1564a1a4a5d7027 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -12,9 +12,9 @@ class RDDSuite extends FunSuite with LocalSparkContext { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) - assert(dups.distinct.count === 4) - assert(dups.distinct().collect === dups.distinct.collect) - assert(dups.distinct(2).collect === dups.distinct.collect) + assert(dups.distinct().count === 4) + assert(dups.distinct().collect === dups.distinct().collect) + assert(dups.distinct(2).collect === dups.distinct().collect) assert(nums.reduce(_ + _) === 10) assert(nums.fold(0)(_ + _) === 10) assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4")) @@ -31,6 +31,10 @@ class RDDSuite extends FunSuite with LocalSparkContext { case(split, iter) => Iterator((split, iter.reduceLeft(_ + _))) } assert(partitionSumsWithSplit.collect().toList === List((0, 3), (1, 7))) + + intercept[UnsupportedOperationException] { + nums.filter(_ > 5).reduce(_ + _) + } } test("SparkContext.union") { @@ -164,7 +168,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { // Note that split number starts from 0, so > 8 means only 10th partition left. val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8) assert(prunedRdd.splits.size === 1) - val prunedData = prunedRdd.collect + val prunedData = prunedRdd.collect() assert(prunedData.size === 1) assert(prunedData(0) === 10) }