Skip to content
Snippets Groups Projects
Commit f6eb1f08 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #413 from pwendell/stage-logging

SPARK-658: Adding logging of stage duration
parents dda2ce01 7ee824e4
No related branches found
No related tags found
No related merge requests found
......@@ -393,6 +393,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
logDebug("New pending tasks: " + myPending)
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
if (!stage.submissionTime.isDefined) {
stage.submissionTime = Some(System.currentTimeMillis())
}
} else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
......@@ -407,6 +410,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stage = idToStage(task.stageId)
def markStageAsFinished(stage: Stage) = {
val serviceTime = stage.submissionTime match {
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
case _ => "Unkown"
}
logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))
running -= stage
}
event.reason match {
case Success =>
logInfo("Completed " + task)
......@@ -421,13 +433,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (!job.finished(rt.outputId)) {
job.finished(rt.outputId) = true
job.numFinished += 1
job.listener.taskSucceeded(rt.outputId, event.result)
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
activeJobs -= job
resultStageToJob -= stage
running -= stage
markStageAsFinished(stage)
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
......@@ -444,8 +456,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
stage.addOutputLoc(smt.partition, status)
}
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages")
running -= stage
markStageAsFinished(stage)
logInfo("looking for newly runnable stages")
logInfo("running: " + running)
logInfo("waiting: " + waiting)
logInfo("failed: " + failed)
......
......@@ -32,6 +32,9 @@ private[spark] class Stage(
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
var numAvailableOutputs = 0
/** When first task was submitted to scheduler. */
var submissionTime: Option[Long] = None
private var nextAttemptId = 0
def isAvailable: Boolean = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment