diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index f10d7cc84ebcba07811ec25fe4200e82ee882d8a..b130be6a3863a005cee230fee87deea7ce3962e7 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -393,6 +393,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
       logDebug("New pending tasks: " + myPending)
       taskSched.submitTasks(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
+      if (!stage.submissionTime.isDefined) {
+        stage.submissionTime = Some(System.currentTimeMillis())
+      }
     } else {
       logDebug("Stage " + stage + " is actually done; %b %d %d".format(
         stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
@@ -407,6 +410,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
   def handleTaskCompletion(event: CompletionEvent) {
     val task = event.task
     val stage = idToStage(task.stageId)
+
+    def markStageAsFinished(stage: Stage) = {
+      val serviceTime = stage.submissionTime match {
+        case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
+        case _ => "Unkown"
+      }
+      logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))
+      running -= stage
+    }
     event.reason match {
       case Success =>
         logInfo("Completed " + task)
@@ -421,13 +433,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
                 if (!job.finished(rt.outputId)) {
                   job.finished(rt.outputId) = true
                   job.numFinished += 1
-                  job.listener.taskSucceeded(rt.outputId, event.result)
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
                     activeJobs -= job
                     resultStageToJob -= stage
-                    running -= stage
+                    markStageAsFinished(stage)
                   }
+                  job.listener.taskSucceeded(rt.outputId, event.result)
                 }
               case None =>
                 logInfo("Ignoring result from " + rt + " because its job has finished")
@@ -444,8 +456,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
               stage.addOutputLoc(smt.partition, status)
             }
             if (running.contains(stage) && pendingTasks(stage).isEmpty) {
-              logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages")
-              running -= stage
+              markStageAsFinished(stage)
+              logInfo("looking for newly runnable stages")
               logInfo("running: " + running)
               logInfo("waiting: " + waiting)
               logInfo("failed: " + failed)
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index e9419728e3f34c98711703772c0b528ffb611ebd..374114d87034cf1851ade92c36fd42bbc72c6e82 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -32,6 +32,9 @@ private[spark] class Stage(
   val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
   var numAvailableOutputs = 0
 
+  /** When first task was submitted to scheduler. */
+  var submissionTime: Option[Long] = None
+
   private var nextAttemptId = 0
 
   def isAvailable: Boolean = {