From 501433f1d59b1b326c0a7169fa1fd6136f7628e3 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Mon, 28 Jan 2013 10:17:35 -0800
Subject: [PATCH] Making submission time a field

---
 core/src/main/scala/spark/scheduler/DAGScheduler.scala | 7 +++----
 core/src/main/scala/spark/scheduler/Stage.scala        | 3 +++
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index bce7418e87..7ba1f3430a 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -86,7 +86,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
 
   val activeJobs = new HashSet[ActiveJob]
   val resultStageToJob = new HashMap[Stage, ActiveJob]
-  val stageSubmissionTimes = new HashMap[Stage, Long]
 
   val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
 
@@ -394,8 +393,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
       logDebug("New pending tasks: " + myPending)
       taskSched.submitTasks(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
-      if (!stageSubmissionTimes.contains(stage)) {
-        stageSubmissionTimes.put(stage, System.currentTimeMillis())
+      if (!stage.submissionTime.isDefined) {
+        stage.submissionTime = Some(System.currentTimeMillis())
       }
     } else {
       logDebug("Stage " + stage + " is actually done; %b %d %d".format(
@@ -413,7 +412,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
     val stage = idToStage(task.stageId)
 
     def markStageAsFinished(stage: Stage) = {
-      val serviceTime = stageSubmissionTimes.remove(stage) match {
+      val serviceTime = stage.submissionTime match {
         case Some(t) => (System.currentTimeMillis() - t).toString
         case _ => "Unkown"
       }
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index e9419728e3..374114d870 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -32,6 +32,9 @@ private[spark] class Stage(
   val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
   var numAvailableOutputs = 0
 
+  /** When first task was submitted to scheduler. */
+  var submissionTime: Option[Long] = None
+
   private var nextAttemptId = 0
 
   def isAvailable: Boolean = {
-- 
GitLab