From 07f568e1bfc67eead88e2c5dbfb9cac23e1ac8bc Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Thu, 24 Jan 2013 15:27:29 -0800
Subject: [PATCH] SPARK-658: Adding logging of stage duration

---
 .../scala/spark/scheduler/DAGScheduler.scala  | 21 +++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index bd541d4207..8aad667182 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -86,6 +86,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
 
   val activeJobs = new HashSet[ActiveJob]
   val resultStageToJob = new HashMap[Stage, ActiveJob]
+  val stageSubmissionTimes = new HashMap[Stage, Long]
 
   val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
 
@@ -393,6 +394,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
       logDebug("New pending tasks: " + myPending)
       taskSched.submitTasks(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
+      if (!stageSubmissionTimes.contains(stage)) {
+        stageSubmissionTimes.put(stage, System.currentTimeMillis())
+      }
     } else {
       logDebug("Stage " + stage + " is actually done; %b %d %d".format(
         stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
@@ -407,6 +411,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
   def handleTaskCompletion(event: CompletionEvent) {
     val task = event.task
     val stage = idToStage(task.stageId)
+
+    def stageFinished(stage: Stage) = {
+      val serviceTime = stageSubmissionTimes.remove(stage) match {
+        case Some(t) => (System.currentTimeMillis() - t).toString
+        case _ => "Unkown"
+      }
+      logInfo("%s (%s) finished in %s ms".format(stage, stage.origin, serviceTime))
+      running -= stage
+    }
     event.reason match {
       case Success =>
         logInfo("Completed " + task)
@@ -421,13 +434,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
                 if (!job.finished(rt.outputId)) {
                   job.finished(rt.outputId) = true
                   job.numFinished += 1
-                  job.listener.taskSucceeded(rt.outputId, event.result)
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
                     activeJobs -= job
                     resultStageToJob -= stage
-                    running -= stage
+                    stageFinished(stage)
                   }
+                  job.listener.taskSucceeded(rt.outputId, event.result)
                 }
               case None =>
                 logInfo("Ignoring result from " + rt + " because its job has finished")
@@ -444,8 +457,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
               stage.addOutputLoc(smt.partition, status)
             }
             if (running.contains(stage) && pendingTasks(stage).isEmpty) {
-              logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages")
-              running -= stage
+              stageFinished(stage)
+              logInfo("looking for newly runnable stages")
               logInfo("running: " + running)
               logInfo("waiting: " + waiting)
               logInfo("failed: " + failed)
-- 
GitLab