From 2a5161708f4d2f743c7bd69ed3d98bb7bff46460 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Mon, 10 Mar 2014 16:28:41 -0700
Subject: [PATCH] SPARK-1205: Clean up callSite/origin/generator.

This patch removes the `generator` field and simplifies + documents
the tracking of callsites.

There are two places where we care about call sites, when a job is
run and when an RDD is created. This patch retains both of those
features but does a slight refactoring and renaming to make things
less confusing.

There was another feature of an rdd called the `generator` which was
by default the user class that in which the RDD was created. This is
used exclusively in the JobLogger. It been subsumed by the ability
to name a job group. The job logger can later be refectored to
read the job group directly (will require some work) but for now
this just preserves the default logged value of the user class.
I'm not sure any users ever used the ability to override this.

Author: Patrick Wendell <pwendell@gmail.com>

Closes #106 from pwendell/callsite and squashes the following commits:

fc1d009 [Patrick Wendell] Compile fix
e17fb76 [Patrick Wendell] Review feedback: callSite -> creationSite
62e77ef [Patrick Wendell] Review feedback
576e60b [Patrick Wendell] SPARK-1205: Clean up callSite/origin/generator.
---
 .../scala/org/apache/spark/SparkContext.scala  | 11 +++++------
 .../org/apache/spark/api/java/JavaRDD.scala    |  2 --
 .../apache/spark/api/java/JavaRDDLike.scala    |  5 -----
 .../main/scala/org/apache/spark/rdd/RDD.scala  | 18 ++++--------------
 .../apache/spark/scheduler/DAGScheduler.scala  |  2 +-
 .../org/apache/spark/scheduler/JobLogger.scala | 10 +++-------
 .../org/apache/spark/scheduler/Stage.scala     |  2 +-
 .../scala/org/apache/spark/util/Utils.scala    |  4 ++--
 8 files changed, 16 insertions(+), 38 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cdc0e5a342..745e3fa4e8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -832,13 +832,12 @@ class SparkContext(
     setLocalProperty("externalCallSite", null)
   }
 
+  /**
+   * Capture the current user callsite and return a formatted version for printing. If the user
+   * has overridden the call site, this will return the user's version.
+   */
   private[spark] def getCallSite(): String = {
-    val callSite = getLocalProperty("externalCallSite")
-    if (callSite == null) {
-      Utils.formatSparkCallSite
-    } else {
-      callSite
-    }
+    Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo())
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 91bf404631..01d9357a25 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
   def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
     wrapRDD(rdd.subtract(other, p))
 
-  def generator: String = rdd.generator
-
   override def toString = rdd.toString
 
   /** Assign a name to this RDD */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index af0114bee3..a89419bbd1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -19,7 +19,6 @@ package org.apache.spark.api.java
 
 import java.util.{Comparator, List => JList}
 
-import scala.Tuple2
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
@@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
 
   def name(): String = rdd.name
 
-  /** Reset generator */
-  def setGenerator(_generator: String) = {
-    rdd.setGenerator(_generator)
-  }
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3fe56963e0..4afa7523dd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag](
     this
   }
 
-  /** User-defined generator of this RDD*/
-  @transient var generator = Utils.getCallSiteInfo.firstUserClass
-
-  /** Reset generator*/
-  def setGenerator(_generator: String) = {
-    generator = _generator
-  }
-
   /**
    * Set this RDD's storage level to persist its values across operations after the first time
    * it is computed. This can only be used to assign a new storage level if the RDD does not
@@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag](
 
   private var storageLevel: StorageLevel = StorageLevel.NONE
 
-  /** Record user function generating this RDD. */
-  @transient private[spark] val origin = sc.getCallSite()
+  /** User code that created this RDD (e.g. `textFile`, `parallelize`). */
+  @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
+  private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo)
 
   private[spark] def elementClassTag: ClassTag[T] = classTag[T]
 
@@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag](
   }
 
   override def toString: String = "%s%s[%d] at %s".format(
-    Option(name).map(_ + " ").getOrElse(""),
-    getClass.getSimpleName,
-    id,
-    origin)
+    Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite)
 
   def toJavaRDD() : JavaRDD[T] = {
     new JavaRDD(this)(elementClassTag)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dc5b25d845..d83d0341c6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -279,7 +279,7 @@ class DAGScheduler(
     } else {
       // Kind of ugly: need to register RDDs with the cache and map output tracker here
       // since we can't do it in the RDD constructor because # of partitions is unknown
-      logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
+      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
       mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
     }
     stage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 80f9ec7d03..01cbcc390c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String)
    * @param indent Indent number before info
    */
   protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
+    val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE"
     val rddInfo =
-      if (rdd.getStorageLevel != StorageLevel.NONE) {
-        "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
-                rdd.origin + " " + rdd.generator
-      } else {
-        "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
-                rdd.origin + " " + rdd.generator
-      }
+      s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " +
+      s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}"
     jobLogInfo(jobID, indentString(indent) + rddInfo, false)
     rdd.dependencies.foreach {
       case shufDep: ShuffleDependency[_, _] =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index a78b0186b9..5c1fc30e4a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -100,7 +100,7 @@ private[spark] class Stage(
     id
   }
 
-  val name = callSite.getOrElse(rdd.origin)
+  val name = callSite.getOrElse(rdd.getCreationSite)
 
   override def toString = "Stage " + id
 
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ac376fc403..38a275d438 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -719,8 +719,8 @@ private[spark] object Utils extends Logging {
     new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
   }
 
-  def formatSparkCallSite = {
-    val callSiteInfo = getCallSiteInfo
+  /** Returns a printable version of the call site info suitable for logs. */
+  def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = {
     "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
                          callSiteInfo.firstUserLine)
   }
-- 
GitLab