diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index f62ae374663a7cb662dc569ee4805faac0b45472..a31a7e1d58374568b84e75572efb76028d3404df 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -56,7 +56,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_) val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used", - "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Duration", "Shuffle Read", + "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read", "Shuffle Write") def execRow(kv: Seq[String]) = { @@ -169,21 +169,13 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { // update shuffle read/write if (null != taskEnd.taskMetrics) { - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead - executorToShuffleRead.put(eid, newShuffleRead) - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten - executorToShuffleWrite.put(eid, newShuffleWrite) - } - case _ => {} - } + taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead => + executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) + + shuffleRead.remoteBytesRead)) + + taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite => + executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) + + shuffleWrite.shuffleBytesWritten)) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 75c0dd2c7fe5c750c241973da1e1feba91e8118b..3c53e88380193daa24d6090c16ef306e77ca1de9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -17,8 +17,9 @@ package org.apache.spark.ui.jobs -private[spark] class ExecutorSummary() { - var duration : Long = 0 +/** class for reporting aggregated metrics for each executors in stageUI */ +private[spark] class ExecutorSummary { + var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 var shuffleRead : Long = 0 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 763d5a344b247b31fb03521a7f09af6a8a449547..0e9dd4a8c71f7f4caf3e85fab4b144cb4bfeafb8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -40,7 +40,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) <table class="table table-bordered table-striped table-condensed sortable"> <thead> <th>Executor ID</th> - <th>Duration</th> + <th>Task Time</th> <th>Total Tasks</th> <th>Failed Tasks</th> <th>Succeeded Tasks</th> @@ -61,7 +61,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) case (k,v) => { <tr> <td>{k}</td> - <td>{parent.formatDuration(v.duration)}</td> + <td>{parent.formatDuration(v.taskTime)}</td> <td>{v.failedTasks + v.succeededTasks}</td> <td>{v.failedTasks}</td> <td>{v.succeededTasks}</td> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 854afb665afe00ffe1ff57d62c28b2e83c0fc27d..ca5a28625b7de522c1975c8381282e8677481ad7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -56,10 +56,6 @@ private[spark] class IndexPage(parent: JobProgressUI) { {parent.formatDuration(now - listener.sc.startTime)} </li> <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li> - <li> - <a href="#executors"><strong>Executor Summary:</strong></a> - {listener.stageIdToExecutorSummaries.size} - </li> <li> <a href="#active"><strong>Active Stages:</strong></a> {activeStages.size} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 64ce7159938990ac10061189c2db53d36dc6a0fb..07a42f05035b30d65c16df1272a93949f23cca07 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -144,23 +144,14 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } // update duration - y.duration += taskEnd.taskInfo.duration - - // update shuffle read/write - if (null != taskEnd.taskMetrics) { - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - y.shuffleRead += s.remoteBytesRead - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - y.shuffleWrite += s.shuffleBytesWritten - } - case _ => {} - } + y.taskTime += taskEnd.taskInfo.duration + + taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead => + y.shuffleRead += shuffleRead.remoteBytesRead + } + + taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite => + y.shuffleWrite += shuffleWrite.shuffleBytesWritten } } case _ => {} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index c077613b1d9c8269e418b0e88eab29dab0b55ed6..d8a6c9e2dccddcea6f17985a6bedad322b649e01 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) { <div> <ul class="unstyled"> <li> - <strong>Total duration across all tasks: </strong> + <strong>Total task time across all tasks: </strong> {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)} </li> {if (hasShuffleRead) @@ -163,9 +163,9 @@ private[spark] class StagePage(parent: JobProgressUI) { val executorTable = new ExecutorTable(parent, stageId) val content = summary ++ - <h4>Summary Metrics for Executors</h4> ++ executorTable.toNodeSeq() ++ <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++ <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++ + <h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++ <h4>Tasks</h4> ++ taskTable headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 9ad6de3c6d8de79c758f1d0764b1a171bd56012c..463d85dfd54fdf79ddc15510b301c2a3ab8ff297 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr {if (isFairScheduler) {<th>Pool Name</th>} else {}} <th>Description</th> <th>Submitted</th> - <th>Duration</th> + <th>Task Time</th> <th>Tasks: Succeeded/Total</th> <th>Shuffle Read</th> <th>Shuffle Write</th>