diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 1e4dec86a0530fcb48094d9650f310fb5a8b096a..75ea535f2f57be20c4e7ec9289a8da5e306162d4 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -149,6 +149,9 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: case JobFailed(e: Exception) => scala.util.Failure(e) } } + + /** Get the corresponding job id for this action. */ + def jobId = jobWaiter.jobId } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index f917cfd1419ec6c9c9e2b146a54356791a52cd11..545bc0e9e99ed14490c43ca86bdfab6b4ca286f6 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag @@ -574,4 +574,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name + /** + * :: Experimental :: + * The asynchronous version of the foreach action. + * + * @param f the function to apply to all the elements of the RDD + * @return a FutureAction for the action + */ + @Experimental + def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = { + import org.apache.spark.SparkContext._ + rdd.foreachAsync(x => f.call(x)) + } + } diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index aed951a40b40c75d9c94e2576a7add63fa4b79a7..b62f3fbdc4a15f946cedfe79b36351def5d3465d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -112,7 +112,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi * Applies a function f to all elements of this RDD. */ def foreachAsync(f: T => Unit): FutureAction[Unit] = { - self.context.submitJob[T, Unit, Unit](self, _.foreach(f), Range(0, self.partitions.size), + val cleanF = self.context.clean(f) + self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size), (index, data) => Unit, Unit) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index e9bfee2248e5b6559e8e237a2ee76580859e5e39..29879b374b8014fffc167bc9831690a247f59ce9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -23,7 +23,7 @@ package org.apache.spark.scheduler */ private[spark] class JobWaiter[T]( dagScheduler: DAGScheduler, - jobId: Int, + val jobId: Int, totalTasks: Int, resultHandler: (Int, T) => Unit) extends JobListener { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index fe8ffe6d97a05e2a4c840a02c79e8d939d44910a..a2f1b3582ab71e601e2ffa4a555021f9bf9120b5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -41,6 +41,9 @@ object MimaExcludes { Seq( // Adding new method to JavaRDLike trait - we should probably mark this as a developer API. ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"), + // Should probably mark this as Experimental + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.foreachAsync"), // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values // for countApproxDistinct* functions, which does not work in Java. We later removed // them, and use the following to tell Mima to not care about them.