diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index bce7418e874854289fc04742ba6e90650e39106b..7ba1f3430a15a1076381d476a57810f897288f4c 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -86,7 +86,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val activeJobs = new HashSet[ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob] - val stageSubmissionTimes = new HashMap[Stage, Long] val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) @@ -394,8 +393,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with logDebug("New pending tasks: " + myPending) taskSched.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority)) - if (!stageSubmissionTimes.contains(stage)) { - stageSubmissionTimes.put(stage, System.currentTimeMillis()) + if (!stage.submissionTime.isDefined) { + stage.submissionTime = Some(System.currentTimeMillis()) } } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( @@ -413,7 +412,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val stage = idToStage(task.stageId) def markStageAsFinished(stage: Stage) = { - val serviceTime = stageSubmissionTimes.remove(stage) match { + val serviceTime = stage.submissionTime match { case Some(t) => (System.currentTimeMillis() - t).toString case _ => "Unkown" } diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index e9419728e3f34c98711703772c0b528ffb611ebd..374114d87034cf1851ade92c36fd42bbc72c6e82 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -32,6 +32,9 @@ private[spark] class Stage( val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 + /** When first task was submitted to scheduler. */ + var submissionTime: Option[Long] = None + private var nextAttemptId = 0 def isAvailable: Boolean = {