diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 40b30e4d236cf766dc6a6a094e7a64fb26e1054e..1069e27513aaf8c5d80a9136d1ed3bffc2dbfec8 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -547,7 +547,7 @@ class SparkContext( } def addSparkListener(listener: SparkListener) { - dagScheduler.sparkListeners += listener + dagScheduler.addSparkListener(listener) } /** diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 89c51a44c98790c479dbf0260b563189770295bf..fbf3f4c80715f7589a88a387edb25031d0a0236b 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -102,7 +102,7 @@ class DAGScheduler( private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] - private[spark] val sparkListeners = ArrayBuffer[SparkListener]() + private val listenerBus = new SparkListenerBus() var cacheLocs = new HashMap[Int, Array[List[String]]] @@ -137,6 +137,10 @@ class DAGScheduler( }.start() } + def addSparkListener(listener: SparkListener) { + listenerBus.addListener(listener) + } + private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray @@ -334,7 +338,7 @@ class DAGScheduler( // Compute very short actions like first() or take() with no parent stages locally. runLocally(job) } else { - sparkListeners.foreach(_.onJobStart(SparkListenerJobStart(job, properties))) + listenerBus.post(SparkListenerJobStart(job, properties)) idToActiveJob(runId) = job activeJobs += job resultStageToJob(finalStage) = job @@ -348,11 +352,11 @@ class DAGScheduler( handleExecutorLost(execId) case begin: BeginEvent => - sparkListeners.foreach(_.onTaskStart(SparkListenerTaskStart(begin.task, begin.taskInfo))) + listenerBus.post(SparkListenerTaskStart(begin.task, begin.taskInfo)) case completion: CompletionEvent => - sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task, - completion.reason, completion.taskInfo, completion.taskMetrics))) + listenerBus.post(SparkListenerTaskEnd( + completion.task, completion.reason, completion.taskInfo, completion.taskMetrics)) handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => @@ -363,7 +367,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, None)))) + listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, None))) } return true } @@ -513,8 +517,7 @@ class DAGScheduler( // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" val properties = idToActiveJob(stage.priority).properties - sparkListeners.foreach(_.onStageSubmitted( - SparkListenerStageSubmitted(stage, tasks.size, properties))) + listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties)) if (tasks.size > 0) { // Preemptively serialize a task to make sure it can be serialized. We are catching this @@ -560,8 +563,7 @@ class DAGScheduler( } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stage.completionTime = Some(System.currentTimeMillis) - val stageComp = StageCompleted(stageToInfos(stage)) - sparkListeners.foreach{_.onStageCompleted(stageComp)} + listenerBus.post(StageCompleted(stageToInfos(stage))) running -= stage } event.reason match { @@ -585,7 +587,7 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobSucceeded))) + listenerBus.post(SparkListenerJobEnd(job, JobSucceeded)) } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -732,7 +734,7 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))) + listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))) idToActiveJob -= resultStage.priority activeJobs -= job resultStageToJob -= resultStage diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index ad2efcec63706edc222e9f6dc357f6c9eccec2d0..7194fcaa49aa6b97b59963516eac3e6c66d98ac1 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -23,10 +23,11 @@ import java.io.FileNotFoundException import java.text.SimpleDateFormat import java.util.{Date, Properties} import java.util.concurrent.LinkedBlockingQueue + import scala.collection.mutable.{Map, HashMap, ListBuffer} import scala.io.Source + import spark._ -import spark.SparkContext import spark.executor.TaskMetrics import spark.scheduler.cluster.TaskInfo @@ -54,31 +55,6 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { def getJobIDToStages = jobIDToStages def getEventQueue = eventQueue - new Thread("JobLogger") { - setDaemon(true) - override def run() { - while (true) { - val event = eventQueue.take - logDebug("Got event of type " + event.getClass.getName) - event match { - case SparkListenerJobStart(job, properties) => - processJobStartEvent(job, properties) - case SparkListenerStageSubmitted(stage, taskSize, properties) => - processStageSubmittedEvent(stage, taskSize) - case StageCompleted(stageInfo) => - processStageCompletedEvent(stageInfo) - case SparkListenerJobEnd(job, result) => - processJobEndEvent(job, result) - case SparkListenerTaskStart(task, taskInfo) => - processTaskStartEvent(task, taskInfo) - case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) => - processTaskEndEvent(task, reason, taskInfo, taskMetrics) - case _ => - } - } - } - }.start() - // Create a folder for log files, the folder's name is the creation time of the jobLogger protected def createLogDir() { val dir = new File(logDir + "/" + logDirName + "/") @@ -239,49 +215,32 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { - eventQueue.put(stageSubmitted) - } - - protected def processStageSubmittedEvent(stage: Stage, taskSize: Int) { - stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED" + " TASK_SIZE=" + taskSize) + stageLogInfo( + stageSubmitted.stage.id, + "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format( + stageSubmitted.stage.id, stageSubmitted.taskSize)) } override def onStageCompleted(stageCompleted: StageCompleted) { - eventQueue.put(stageCompleted) - } - - protected def processStageCompletedEvent(stageInfo: StageInfo) { - stageLogInfo(stageInfo.stage.id, "STAGE_ID=" + - stageInfo.stage.id + " STATUS=COMPLETED") + stageLogInfo( + stageCompleted.stageInfo.stage.id, + "STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id)) } - override def onTaskStart(taskStart: SparkListenerTaskStart) { - eventQueue.put(taskStart) - } - - protected def processTaskStartEvent(task: Task[_], taskInfo: TaskInfo) { - var taskStatus = "" - task match { - case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK" - case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK" - } - } + override def onTaskStart(taskStart: SparkListenerTaskStart) { } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - eventQueue.put(taskEnd) - } - - protected def processTaskEndEvent(task: Task[_], reason: TaskEndReason, - taskInfo: TaskInfo, taskMetrics: TaskMetrics) { + val task = taskEnd.task + val taskInfo = taskEnd.taskInfo var taskStatus = "" task match { case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK" case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK" } - reason match { + taskEnd.reason match { case Success => taskStatus += " STATUS=SUCCESS" - recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskMetrics) + recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics) case Resubmitted => taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId + " STAGE_ID=" + task.stageId @@ -300,12 +259,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } override def onJobEnd(jobEnd: SparkListenerJobEnd) { - eventQueue.put(jobEnd) - } - - protected def processJobEndEvent(job: ActiveJob, reason: JobResult) { + val job = jobEnd.job var info = "JOB_ID=" + job.runId - reason match { + jobEnd.jobResult match { case JobSucceeded => info += " STATUS=SUCCESS" case JobFailed(exception, _) => info += " STATUS=FAILED REASON=" @@ -324,10 +280,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { } override def onJobStart(jobStart: SparkListenerJobStart) { - eventQueue.put(jobStart) - } - - protected def processJobStartEvent(job: ActiveJob, properties: Properties) { + val job = jobStart.job + val properties = jobStart.properties createLogWriter(job.runId) recordJobProperties(job.runId, properties) buildJobDep(job.runId, job.finalStage) diff --git a/core/src/main/scala/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/spark/scheduler/SparkListenerBus.scala new file mode 100644 index 0000000000000000000000000000000000000000..f55ed455ed3f700cc5e6cc7664c1a7d39d40699b --- /dev/null +++ b/core/src/main/scala/spark/scheduler/SparkListenerBus.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.scheduler + +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} + +import spark.Logging + +/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */ +private[spark] class SparkListenerBus() extends Logging { + private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener] + + /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than + * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ + private val EVENT_QUEUE_CAPACITY = 10000 + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY) + private var queueFullErrorMessageLogged = false + + new Thread("SparkListenerBus") { + setDaemon(true) + override def run() { + while (true) { + val event = eventQueue.take + event match { + case stageSubmitted: SparkListenerStageSubmitted => + sparkListeners.foreach(_.onStageSubmitted(stageSubmitted)) + case stageCompleted: StageCompleted => + sparkListeners.foreach(_.onStageCompleted(stageCompleted)) + case jobStart: SparkListenerJobStart => + sparkListeners.foreach(_.onJobStart(jobStart)) + case jobEnd: SparkListenerJobEnd => + sparkListeners.foreach(_.onJobEnd(jobEnd)) + case taskStart: SparkListenerTaskStart => + sparkListeners.foreach(_.onTaskStart(taskStart)) + case taskEnd: SparkListenerTaskEnd => + sparkListeners.foreach(_.onTaskEnd(taskEnd)) + case _ => + } + } + } + }.start() + + def addListener(listener: SparkListener) { + sparkListeners += listener + } + + def post(event: SparkListenerEvents) { + val eventAdded = eventQueue.offer(event) + if (!eventAdded && !queueFullErrorMessageLogged) { + logError("Dropping SparkListenerEvent because no remaining room in event queue. " + + "This likely means one of the SparkListeners is too slow and cannot keep up with the " + + "rate at which tasks are being started by the scheduler.") + queueFullErrorMessageLogged = true + } + } +} + diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index 117b84e6158fc882e8b3d5f1df3a10e841240baa..9724671a03225f0d27d0323f7dc1dbb0c45171c6 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -31,73 +31,63 @@ private[spark] class IndexPage(parent: JobProgressUI) { def listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - val activeStages = listener.activeStages.toSeq - val completedStages = listener.completedStages.reverse.toSeq - val failedStages = listener.failedStages.reverse.toSeq - val now = System.currentTimeMillis() + listener.synchronized { + val activeStages = listener.activeStages.toSeq + val completedStages = listener.completedStages.reverse.toSeq + val failedStages = listener.failedStages.reverse.toSeq + val now = System.currentTimeMillis() - var activeTime = 0L - for (tasks <- listener.stageToTasksActive.values; t <- tasks) { - activeTime += t.timeRunning(now) - } + var activeTime = 0L + for (tasks <- listener.stageToTasksActive.values; t <- tasks) { + activeTime += t.timeRunning(now) + } - val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) - val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) - val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) + val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) + val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) + val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) - val poolTable = new PoolTable(listener.sc.getAllPools, listener) - val summary: NodeSeq = - <div> - <ul class="unstyled"> - <li> - <strong>Duration: </strong> - {parent.formatDuration(now - listener.sc.startTime)} - </li> - <li> - <strong>CPU time: </strong> - {parent.formatDuration(listener.totalTime + activeTime)} - </li> - {if (listener.totalShuffleRead > 0) + val poolTable = new PoolTable(listener.sc.getAllPools, listener) + val summary: NodeSeq = + <div> + <ul class="unstyled"> + <li> + <strong>Duration: </strong> + {parent.formatDuration(now - listener.sc.startTime)} + </li> + <li> + <strong>CPU Time: </strong> + {parent.formatDuration(listener.totalTime + activeTime)} + </li> + <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li> <li> - <strong>Shuffle read: </strong> - {Utils.memoryBytesToString(listener.totalShuffleRead)} - </li> - } - {if (listener.totalShuffleWrite > 0) + <a href="#active"><strong>Active Stages:</strong></a> + {activeStages.size} + </li> <li> - <strong>Shuffle write: </strong> - {Utils.memoryBytesToString(listener.totalShuffleWrite)} - </li> - } - <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li> - <li> - <a href="#active"><strong>Active Stages:</strong></a> - {activeStages.size} - </li> - <li> - <a href="#completed"><strong>Completed Stages:</strong></a> - {completedStages.size} - </li> - <li> - <a href="#failed"><strong>Failed Stages:</strong></a> - {failedStages.size} - </li> - </ul> - </div> + <a href="#completed"><strong>Completed Stages:</strong></a> + {completedStages.size} + </li> + <li> + <a href="#failed"><strong>Failed Stages:</strong></a> + {failedStages.size} + </li> + </ul> + </div> - val content = summary ++ - {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) { - <h4>Pools</h4> ++ poolTable.toNodeSeq - } else { - Seq() - }} ++ - <h4 id="active">Active Stages : {activeStages.size}</h4> ++ - activeStagesTable.toNodeSeq++ - <h4 id="completed">Completed Stages : {completedStages.size}</h4> ++ - completedStagesTable.toNodeSeq++ - <h4 id ="failed">Failed Stages : {failedStages.size}</h4> ++ - failedStagesTable.toNodeSeq + val content = summary ++ + {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) { + <h4>Pools</h4> ++ poolTable.toNodeSeq + } else { + Seq() + }} ++ + <h4 id="active">Active Stages: {activeStages.size}</h4> ++ + activeStagesTable.toNodeSeq++ + <h4 id="completed">Completed Stages: {completedStages.size}</h4> ++ + completedStagesTable.toNodeSeq++ + <h4 id ="failed">Failed Stages: {failedStages.size}</h4> ++ + failedStagesTable.toNodeSeq - headerSparkPage(content, parent.sc, "Spark Stages", Jobs) + headerSparkPage(content, parent.sc, "Spark Stages", Jobs) + } } } diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala index c6103edcb0f180482d581b6343adf8a59497b704..1d9767a83c8f65a75a7897eee406ae19a91b12f9 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -9,6 +9,13 @@ import spark.scheduler.cluster.TaskInfo import spark.executor.TaskMetrics import collection.mutable +/** + * Tracks task-level information to be displayed in the UI. + * + * All access to the data structures in this class must be synchronized on the + * class, since the UI thread and the DAGScheduler event loop may otherwise + * be reading/updating the internal data structures concurrently. + */ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener { // How many stages to remember val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt @@ -38,7 +45,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList override def onJobStart(jobStart: SparkListenerJobStart) {} - override def onStageCompleted(stageCompleted: StageCompleted) = { + override def onStageCompleted(stageCompleted: StageCompleted) = synchronized { val stage = stageCompleted.stageInfo.stage poolToActiveStages(stageToPool(stage)) -= stage activeStages -= stage @@ -47,7 +54,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } /** If stages is too large, remove and garbage collect old stages */ - def trimIfNecessary(stages: ListBuffer[Stage]) { + def trimIfNecessary(stages: ListBuffer[Stage]) = synchronized { if (stages.size > RETAINED_STAGES) { val toRemove = RETAINED_STAGES / 10 stages.takeRight(toRemove).foreach( s => { @@ -66,7 +73,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */ - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = { + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized { val stage = stageSubmitted.stage activeStages += stage @@ -84,7 +91,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stages += stage } - override def onTaskStart(taskStart: SparkListenerTaskStart) { + override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { val sid = taskStart.task.stageId val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive += taskStart.taskInfo @@ -94,7 +101,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageToTaskInfos(sid) = taskList } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val sid = taskEnd.task.stageId val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive -= taskEnd.taskInfo @@ -132,7 +139,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageToTaskInfos(sid) = taskList } - override def onJobEnd(jobEnd: SparkListenerJobEnd) { + override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized { jobEnd match { case end: SparkListenerJobEnd => end.jobResult match { @@ -146,22 +153,4 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList case _ => } } - - /** Is this stage's input from a shuffle read. */ - def hasShuffleRead(stageID: Int): Boolean = { - // This is written in a slightly complicated way to avoid having to scan all tasks - for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined - } - return false // No tasks have finished for this stage - } - - /** Is this stage's output to a shuffle write. */ - def hasShuffleWrite(stageID: Int): Boolean = { - // This is written in a slightly complicated way to avoid having to scan all tasks - for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined - } - return false // No tasks have finished for this stage - } } diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala index 647c6d2ae3edb836e75f806073d1141c02dee0ea..04ef35c8008a507e6478aa29b0d9be9f9daf5d04 100644 --- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala @@ -14,17 +14,19 @@ private[spark] class PoolPage(parent: JobProgressUI) { def listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - val poolName = request.getParameter("poolname") - val poolToActiveStages = listener.poolToActiveStages - val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq - val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) + listener.synchronized { + val poolName = request.getParameter("poolname") + val poolToActiveStages = listener.poolToActiveStages + val activeStages = poolToActiveStages.get(poolName).toSeq.flatten + val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) - val pool = listener.sc.getPoolForName(poolName).get - val poolTable = new PoolTable(Seq(pool), listener) + val pool = listener.sc.getPoolForName(poolName).get + val poolTable = new PoolTable(Seq(pool), listener) - val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++ - <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq() + val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++ + <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq() - headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs) + headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs) + } } } diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala index 9cfe0d68f0eb6f921e5b70d2c92be07023ec6594..21ebcef63aa4f2af91f725ba2fefcb3508786054 100644 --- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala @@ -13,11 +13,12 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages def toNodeSeq(): Seq[Node] = { - poolTable(poolRow, pools) + listener.synchronized { + poolTable(poolRow, pools) + } } - // pool tables - def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node], + private def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node], rows: Seq[Schedulable] ): Seq[Node] = { <table class="table table-bordered table-striped table-condensed sortable"> @@ -35,12 +36,16 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis </table> } - def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = { + private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = { + val activeStages = poolToActiveStages.get(p.name) match { + case Some(stages) => stages.size + case None => 0 + } <tr> <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td> <td>{p.minShare}</td> <td>{p.weight}</td> - <td>{poolToActiveStages.getOrElseUpdate(p.name, new HashSet[Stage]()).size}</td> + <td>{activeStages}</td> <td>{p.runningTasks}</td> <td>{p.schedulingMode}</td> </tr> diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index 02f9adf8a8fea7fcd3c7db6908c5b3e41a774a6b..1b071a91e55584cbd54865aa835b54b5eefb9ca6 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -36,100 +36,102 @@ private[spark] class StagePage(parent: JobProgressUI) { val dateFmt = parent.dateFmt def render(request: HttpServletRequest): Seq[Node] = { - val stageId = request.getParameter("id").toInt - val now = System.currentTimeMillis() + listener.synchronized { + val stageId = request.getParameter("id").toInt + val now = System.currentTimeMillis() + + if (!listener.stageToTaskInfos.contains(stageId)) { + val content = + <div> + <h4>Summary Metrics</h4> No tasks have started yet + <h4>Tasks</h4> No tasks have started yet + </div> + return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) + } - if (!listener.stageToTaskInfos.contains(stageId)) { - val content = - <div> - <h4>Summary Metrics</h4> No tasks have started yet - <h4>Tasks</h4> No tasks have started yet - </div> - return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) - } + val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime) - val tasks = listener.stageToTaskInfos(stageId).toSeq + val shuffleReadBytes = listener.stageToShuffleRead.getOrElse(stageId, 0L) + val hasShuffleRead = shuffleReadBytes > 0 + val shuffleWriteBytes = listener.stageToShuffleWrite.getOrElse(stageId, 0L) + val hasShuffleWrite = shuffleWriteBytes > 0 - val shuffleRead = listener.stageToShuffleRead(stageId) > 0 - val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0 + var activeTime = 0L + listener.stageToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) - var activeTime = 0L - listener.stageToTasksActive(stageId).foreach { t => - activeTime += t.timeRunning(now) - } - - val summary = - <div> - <ul class="unstyled"> - <li> - <strong>CPU time: </strong> - {parent.formatDuration(listener.stageToTime(stageId) + activeTime)} - </li> - {if (shuffleRead) - <li> - <strong>Shuffle read: </strong> - {Utils.memoryBytesToString(listener.stageToShuffleRead(stageId))} - </li> - } - {if (shuffleWrite) + val summary = + <div> + <ul class="unstyled"> <li> - <strong>Shuffle write: </strong> - {Utils.memoryBytesToString(listener.stageToShuffleWrite(stageId))} + <strong>CPU time: </strong> + {parent.formatDuration(listener.stageToTime.getOrElse(stageId, 0L) + activeTime)} </li> - } - </ul> - </div> + {if (hasShuffleRead) + <li> + <strong>Shuffle read: </strong> + {Utils.memoryBytesToString(shuffleReadBytes)} + </li> + } + {if (hasShuffleWrite) + <li> + <strong>Shuffle write: </strong> + {Utils.memoryBytesToString(shuffleWriteBytes)} + </li> + } + </ul> + </div> - val taskHeaders: Seq[String] = - Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++ - {if (shuffleRead) Seq("Shuffle Read") else Nil} ++ - {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++ - Seq("Details") + val taskHeaders: Seq[String] = + Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++ + {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ + {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++ + Seq("Details") - val taskTable = listingTable(taskHeaders, taskRow, tasks) + val taskTable = listingTable(taskHeaders, taskRow, tasks) - // Excludes tasks which failed and have incomplete metrics - val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (Option(t._2).isDefined)) + // Excludes tasks which failed and have incomplete metrics + val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined)) - val summaryTable: Option[Seq[Node]] = - if (validTasks.size == 0) { - None - } - else { - val serviceTimes = validTasks.map{case (info, metrics, exception) => - metrics.get.executorRunTime.toDouble} - val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( - ms => parent.formatDuration(ms.toLong)) - - def getQuantileCols(data: Seq[Double]) = - Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong)) - - val shuffleReadSizes = validTasks.map { - case(info, metrics, exception) => - metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble + val summaryTable: Option[Seq[Node]] = + if (validTasks.size == 0) { + None } - val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) + else { + val serviceTimes = validTasks.map{case (info, metrics, exception) => + metrics.get.executorRunTime.toDouble} + val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( + ms => parent.formatDuration(ms.toLong)) + + def getQuantileCols(data: Seq[Double]) = + Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong)) + + val shuffleReadSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble + } + val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) - val shuffleWriteSizes = validTasks.map { - case(info, metrics, exception) => - metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble - } - val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) + val shuffleWriteSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble + } + val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) - val listings: Seq[Seq[String]] = Seq(serviceQuantiles, - if (shuffleRead) shuffleReadQuantiles else Nil, - if (shuffleWrite) shuffleWriteQuantiles else Nil) + val listings: Seq[Seq[String]] = Seq(serviceQuantiles, + if (hasShuffleRead) shuffleReadQuantiles else Nil, + if (hasShuffleWrite) shuffleWriteQuantiles else Nil) - val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max") - def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr> - Some(listingTable(quantileHeaders, quantileRow, listings)) - } + val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max") + def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr> + Some(listingTable(quantileHeaders, quantileRow, listings)) + } - val content = - summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ - <h2>Tasks</h2> ++ taskTable; + val content = + summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ + <h2>Tasks</h2> ++ taskTable; - headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) + headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) + } } diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala index 1df0e0913c6755cee8ecc5d5cf7b146a65fd8d7c..5068a025fa2fe6d65ef336707f0f9e37ea4178db 100644 --- a/core/src/main/scala/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala @@ -25,11 +25,13 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR def toNodeSeq(): Seq[Node] = { - stageTable(stageRow, stages) + listener.synchronized { + stageTable(stageRow, stages) + } } /** Special table which merges two header cells. */ - def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + private def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { <table class="table table-bordered table-striped table-condensed sortable"> <thead> <th>Stage Id</th> @@ -47,14 +49,14 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU </table> } - def getElapsedTime(submitted: Option[Long], completed: Long): String = { + private def getElapsedTime(submitted: Option[Long], completed: Long): String = { submitted match { case Some(t) => parent.formatDuration(completed - t) case _ => "Unknown" } } - def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = { + private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) val startWidth = "width: %s%%".format((started.toDouble/total)*100) @@ -68,7 +70,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU } - def stageRow(s: Stage): Seq[Node] = { + private def stageRow(s: Stage): Seq[Node] = { val submissionTime = s.submissionTime match { case Some(t) => dateFmt.format(new Date(t)) case None => "Unknown"