From fbf2678c16acc0071ebd1cbdd165702635be5f0c Mon Sep 17 00:00:00 2001
From: lirui <rui.li@intel.com>
Date: Mon, 1 Sep 2014 23:28:19 -0700
Subject: [PATCH] SPARK-2636: Expose job ID in JobWaiter API

This PR adds the async actions to the Java API. User can call these async actions to get the FutureAction and use JobWaiter (for SimpleFutureAction) to retrieve job Id.

Author: lirui <rui.li@intel.com>

Closes #2176 from lirui-intel/SPARK-2636 and squashes the following commits:

ccaafb7 [lirui] SPARK-2636: fix java doc
5536d55 [lirui] SPARK-2636: mark the async API as experimental
e2e01d5 [lirui] SPARK-2636: add mima exclude
0ca320d [lirui] SPARK-2636: fix method name & javadoc
3fa39f7 [lirui] SPARK-2636: refine the patch
af4f5d9 [lirui] SPARK-2636: remove unused imports
843276c [lirui] SPARK-2636: only keep foreachAsync in the java API
fbf5744 [lirui] SPARK-2636: add more async actions for java api
1b25abc [lirui] SPARK-2636: expose some fields in JobWaiter
d09f732 [lirui] SPARK-2636: fix build
eb1ee79 [lirui] SPARK-2636: change some parameters in SimpleFutureAction to member field
6e2b87b [lirui] SPARK-2636: add java API for async actions
---
 .../scala/org/apache/spark/FutureAction.scala     |  3 +++
 .../org/apache/spark/api/java/JavaRDDLike.scala   | 15 ++++++++++++++-
 .../org/apache/spark/rdd/AsyncRDDActions.scala    |  3 ++-
 .../org/apache/spark/scheduler/JobWaiter.scala    |  2 +-
 project/MimaExcludes.scala                        |  3 +++
 5 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 1e4dec86a0..75ea535f2f 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -149,6 +149,9 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
       case JobFailed(e: Exception) => scala.util.Failure(e)
     }
   }
+
+  /** Get the corresponding job id for this action. */
+  def jobId = jobWaiter.jobId
 }
 
 
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index f917cfd141..545bc0e9e9 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
 
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
@@ -574,4 +574,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
 
   def name(): String = rdd.name
 
+  /**
+   * :: Experimental ::
+   * The asynchronous version of the foreach action.
+   *
+   * @param f the function to apply to all the elements of the RDD
+   * @return a FutureAction for the action
+   */
+  @Experimental
+  def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
+    import org.apache.spark.SparkContext._
+    rdd.foreachAsync(x => f.call(x))
+  }
+
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index aed951a40b..b62f3fbdc4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -112,7 +112,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
    * Applies a function f to all elements of this RDD.
    */
   def foreachAsync(f: T => Unit): FutureAction[Unit] = {
-    self.context.submitJob[T, Unit, Unit](self, _.foreach(f), Range(0, self.partitions.size),
+    val cleanF = self.context.clean(f)
+    self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
       (index, data) => Unit, Unit)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index e9bfee2248..29879b374b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -23,7 +23,7 @@ package org.apache.spark.scheduler
  */
 private[spark] class JobWaiter[T](
     dagScheduler: DAGScheduler,
-    jobId: Int,
+    val jobId: Int,
     totalTasks: Int,
     resultHandler: (Int, T) => Unit)
   extends JobListener {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fe8ffe6d97..a2f1b3582a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -41,6 +41,9 @@ object MimaExcludes {
           Seq(
             // Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
             ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
+            // Should probably mark this as Experimental
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
             // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
             // for countApproxDistinct* functions, which does not work in Java. We later removed
             // them, and use the following to tell Mima to not care about them.
-- 
GitLab