From 3d3f8c8004da110ca97973119e9d9f04f878ee81 Mon Sep 17 00:00:00 2001
From: CrazyJvm <crazyjvm@gmail.com>
Date: Thu, 5 Jun 2014 17:44:46 -0700
Subject: [PATCH] Use pluggable clock in DAGSheduler #SPARK-2031

DAGScheduler supports pluggable clock like what TaskSetManager does.

Author: CrazyJvm <crazyjvm@gmail.com>

Closes #976 from CrazyJvm/clock and squashes the following commits:

6779a4c [CrazyJvm] Use pluggable clock in DAGSheduler
---
 .../org/apache/spark/scheduler/DAGScheduler.scala   | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ccff6a3d1a..e09a4221e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SystemClock, Clock, Utils}
 
 /**
  * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -61,7 +61,8 @@ class DAGScheduler(
     listenerBus: LiveListenerBus,
     mapOutputTracker: MapOutputTrackerMaster,
     blockManagerMaster: BlockManagerMaster,
-    env: SparkEnv)
+    env: SparkEnv,
+    clock: Clock = SystemClock)
   extends Logging {
 
   import DAGScheduler._
@@ -781,7 +782,7 @@ class DAGScheduler(
       logDebug("New pending tasks: " + myPending)
       taskScheduler.submitTasks(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
-      stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
+      stageToInfos(stage).submissionTime = Some(clock.getTime())
     } else {
       logDebug("Stage " + stage + " is actually done; %b %d %d".format(
         stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
@@ -807,11 +808,11 @@ class DAGScheduler(
 
     def markStageAsFinished(stage: Stage) = {
       val serviceTime = stageToInfos(stage).submissionTime match {
-        case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
+        case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
         case _ => "Unknown"
       }
       logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
-      stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
+      stageToInfos(stage).completionTime = Some(clock.getTime())
       listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
       runningStages -= stage
     }
@@ -1015,7 +1016,7 @@ class DAGScheduler(
       return
     }
     val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
-    stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis())
+    stageToInfos(failedStage).completionTime = Some(clock.getTime())
     for (resultStage <- dependentStages) {
       val job = resultStageToJob(resultStage)
       failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason",
-- 
GitLab