From b4619e509bc3e06baa3b031ef2c1981d3bf02cbd Mon Sep 17 00:00:00 2001
From: Kay Ousterhout <kayousterhout@gmail.com>
Date: Fri, 27 Dec 2013 17:44:32 -0800
Subject: [PATCH] Changed naming of StageCompleted event to be consistent

The rest of the SparkListener events are named with "SparkListener"
as the prefix of the name; this commit renames the StageCompleted
event to SparkListenerStageCompleted for consistency.
---
 .../apache/spark/scheduler/DAGScheduler.scala |  2 +-
 .../apache/spark/scheduler/JobLogger.scala    |  2 +-
 .../spark/scheduler/SparkListener.scala       | 22 ++++++++++++-------
 .../spark/scheduler/SparkListenerBus.scala    |  2 +-
 .../spark/ui/jobs/JobProgressListener.scala   |  2 +-
 .../spark/scheduler/JobLoggerSuite.scala      |  2 +-
 .../spark/scheduler/SparkListenerSuite.scala  |  2 +-
 7 files changed, 20 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c48a3d64ef..7603eb292f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -827,7 +827,7 @@ class DAGScheduler(
       }
       logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
       stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
-      listenerBus.post(StageCompleted(stageToInfos(stage)))
+      listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
       running -= stage
     }
     event.reason match {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 60927831a1..7858080e31 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -297,7 +297,7 @@ class JobLogger(val user: String, val logDirName: String)
    * When stage is completed, record stage completion status
    * @param stageCompleted Stage completed event
    */
-  override def onStageCompleted(stageCompleted: StageCompleted) {
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
     stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
         stageCompleted.stage.stageId))
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index ee63b3c4a1..3becb4f068 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -27,7 +27,7 @@ sealed trait SparkListenerEvents
 case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
      extends SparkListenerEvents
 
-case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents
+case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
 
 case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
 
@@ -47,7 +47,7 @@ trait SparkListener {
   /**
    * Called when a stage is completed, with information on the completed stage
    */
-  def onStageCompleted(stageCompleted: StageCompleted) { }
+  def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
 
   /**
    * Called when a stage is submitted
@@ -86,7 +86,7 @@ trait SparkListener {
  * Simple SparkListener that logs a few summary statistics when each stage completes
  */
 class StatsReportListener extends SparkListener with Logging {
-  override def onStageCompleted(stageCompleted: StageCompleted) {
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
     import org.apache.spark.scheduler.StatsReportListener._
     implicit val sc = stageCompleted
     this.logInfo("Finished stage: " + stageCompleted.stage)
@@ -119,13 +119,19 @@ object StatsReportListener extends Logging {
   val probabilities = percentiles.map{_ / 100.0}
   val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
 
-  def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = {
+  def extractDoubleDistribution(
+      stage:SparkListenerStageCompleted,
+      getMetric: (TaskInfo,TaskMetrics) => Option[Double])
+    : Option[Distribution] = {
     Distribution(stage.stage.taskInfos.flatMap {
       case ((info,metric)) => getMetric(info, metric)})
   }
 
   //is there some way to setup the types that I can get rid of this completely?
-  def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = {
+  def extractLongDistribution(
+      stage:SparkListenerStageCompleted,
+      getMetric: (TaskInfo,TaskMetrics) => Option[Long])
+    : Option[Distribution] = {
     extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
   }
 
@@ -147,12 +153,12 @@ object StatsReportListener extends Logging {
   }
 
   def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
-    (implicit stage: StageCompleted) {
+    (implicit stage: SparkListenerStageCompleted) {
     showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
   }
 
   def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
-    (implicit stage: StageCompleted) {
+    (implicit stage: SparkListenerStageCompleted) {
     showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
   }
 
@@ -169,7 +175,7 @@ object StatsReportListener extends Logging {
   }
 
   def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
-    (implicit stage: StageCompleted) {
+    (implicit stage: SparkListenerStageCompleted) {
     showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 85687ea330..e7defd768b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -41,7 +41,7 @@ private[spark] class SparkListenerBus() extends Logging {
         event match {
           case stageSubmitted: SparkListenerStageSubmitted =>
             sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
-          case stageCompleted: StageCompleted =>
+          case stageCompleted: SparkListenerStageCompleted =>
             sparkListeners.foreach(_.onStageCompleted(stageCompleted))
           case jobStart: SparkListenerJobStart =>
             sparkListeners.foreach(_.onJobStart(jobStart))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 2e51dd5a99..058bc2a2e5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -61,7 +61,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
 
   override def onJobStart(jobStart: SparkListenerJobStart) {}
 
-  override def onStageCompleted(stageCompleted: StageCompleted) = synchronized {
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
     val stage = stageCompleted.stage
     poolToActiveStages(stageIdToPool(stage.stageId)) -= stage
     activeStages -= stage
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 002368ff55..d0bd20fc83 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -117,7 +117,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd)  = onTaskEndCount += 1
       override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
       override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
-      override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
+      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1
       override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
     }
     sc.addSparkListener(joblogger)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index d4320e5e14..1a16e438c4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -174,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
 
   class SaveStageInfo extends SparkListener {
     val stageInfos = Buffer[StageInfo]()
-    override def onStageCompleted(stage: StageCompleted) {
+    override def onStageCompleted(stage: SparkListenerStageCompleted) {
       stageInfos += stage.stage
     }
   }
-- 
GitLab