diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index d2b63d6e0df232073ba8b415ce2a87f0c7b1b725..7a1089c816a2c88b22c0f2cd3816b5dc791f5590 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -67,8 +67,8 @@ private[spark] case object RequestMasterState // Master to MasterWebUI private[spark] -case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo], - completedJobs: List[JobInfo]) +case class MasterState(uri: String, workers: Array[WorkerInfo], activeJobs: Array[JobInfo], + completedJobs: Array[JobInfo]) // WorkerWebUI to Worker private[spark] case object RequestWorkerState @@ -78,4 +78,4 @@ private[spark] case object RequestWorkerState private[spark] case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int, - coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) \ No newline at end of file + coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) diff --git a/core/src/main/scala/spark/deploy/WebUI.scala b/core/src/main/scala/spark/deploy/WebUI.scala new file mode 100644 index 0000000000000000000000000000000000000000..ad1a1092b2ebd137cef52ba2355dea8c1e72eef5 --- /dev/null +++ b/core/src/main/scala/spark/deploy/WebUI.scala @@ -0,0 +1,30 @@ +package spark.deploy + +import java.text.SimpleDateFormat +import java.util.Date + +/** + * Utilities used throughout the web UI. + */ +private[spark] object WebUI { + val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + + def formatDate(date: Date): String = DATE_FORMAT.format(date) + + def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp)) + + def formatDuration(milliseconds: Long): String = { + val seconds = milliseconds.toDouble / 1000 + if (seconds < 60) { + return "%.0f s".format(seconds) + } + val minutes = seconds / 60 + if (minutes < 10) { + return "%.1f min".format(minutes) + } else if (minutes < 60) { + return "%.0f min".format(minutes) + } + val hours = minutes / 60 + return "%.1f h".format(hours) + } +} diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala index 8795c09cc17b57383c46a946347e9d36a551e1ab..130b031a2af6cea087b6c2ffb995103ffa7beb40 100644 --- a/core/src/main/scala/spark/deploy/master/JobInfo.scala +++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala @@ -5,11 +5,17 @@ import java.util.Date import akka.actor.ActorRef import scala.collection.mutable -private[spark] -class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) { +private[spark] class JobInfo( + val startTime: Long, + val id: String, + val desc: JobDescription, + val submitDate: Date, + val actor: ActorRef) +{ var state = JobState.WAITING var executors = new mutable.HashMap[Int, ExecutorInfo] var coresGranted = 0 + var endTime = -1L private var nextExecutorId = 0 @@ -41,4 +47,17 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va _retryCount += 1 _retryCount } + + def markFinished(endState: JobState.Value) { + state = endState + endTime = System.currentTimeMillis() + } + + def duration: Long = { + if (endTime != -1) { + endTime - startTime + } else { + System.currentTimeMillis() - startTime + } + } } diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 6010f7cff2943deb8844d6635cd083158c6d957f..5ef7411f4d1dcb9c571411852c75afa0be936279 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -123,7 +123,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } case RequestMasterState => { - sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList) + sender ! MasterState(ip + ":" + port, workers.toArray, jobs.toArray, completedJobs.toArray) } } @@ -179,8 +179,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } def addJob(desc: JobDescription, actor: ActorRef): JobInfo = { - val date = new Date - val job = new JobInfo(newJobId(date), desc, date, actor) + val now = System.currentTimeMillis() + val date = new Date(now) + val job = new JobInfo(now, newJobId(date), desc, date, actor) jobs += job idToJob(job.id) = job actorToJob(sender) = job @@ -189,19 +190,21 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } def removeJob(job: JobInfo) { - logInfo("Removing job " + job.id) - jobs -= job - idToJob -= job.id - actorToJob -= job.actor - addressToWorker -= job.actor.path.address - completedJobs += job // Remember it in our history - waitingJobs -= job - for (exec <- job.executors.values) { - exec.worker.removeExecutor(exec) - exec.worker.actor ! KillExecutor(exec.job.id, exec.id) + if (jobs.contains(job)) { + logInfo("Removing job " + job.id) + jobs -= job + idToJob -= job.id + actorToJob -= job.actor + addressToWorker -= job.actor.path.address + completedJobs += job // Remember it in our history + waitingJobs -= job + for (exec <- job.executors.values) { + exec.worker.removeExecutor(exec) + exec.worker.actor ! KillExecutor(exec.job.id, exec.id) + } + job.markFinished(JobState.FINISHED) // TODO: Mark it as FAILED if it failed + schedule() } - job.state = JobState.FINISHED - schedule() } /** Generate a new job ID given a job's submission date */ diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index 700a41c77066ab14f9967e8dd8595d22b5961c15..3cdd3721f56cfb5cc6de88d51b9c48ffcc50315c 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -36,7 +36,7 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct // A bit ugly an inefficient, but we won't have a number of jobs // so large that it will make a significant difference. - (masterState.activeJobs ::: masterState.completedJobs).find(_.id == jobId) match { + (masterState.activeJobs ++ masterState.completedJobs).find(_.id == jobId) match { case Some(job) => spark.deploy.master.html.job_details.render(job) case _ => null } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 474c9364fd67cb67950d2a81c86a9bd385f75fc8..67d41dda29f1d65e8424f355a44e90671e6416f2 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -123,7 +123,7 @@ private[spark] class Worker( manager.start() coresUsed += cores_ memoryUsed += memory_ - master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None) + master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None) case ExecutorStateChanged(jobId, execId, state, message) => master ! ExecutorStateChanged(jobId, execId, state, message) diff --git a/core/src/main/twirl/spark/deploy/master/index.scala.html b/core/src/main/twirl/spark/deploy/master/index.scala.html index 7562076b0087aad444baf73e109fa8ed4b92d4d3..18c32e5a1f094b35c8dd0107eeadffcd8f09a63a 100644 --- a/core/src/main/twirl/spark/deploy/master/index.scala.html +++ b/core/src/main/twirl/spark/deploy/master/index.scala.html @@ -1,5 +1,6 @@ @(state: spark.deploy.MasterState) @import spark.deploy.master._ +@import spark.Utils @spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) { @@ -8,9 +9,11 @@ <div class="span12"> <ul class="unstyled"> <li><strong>URL:</strong> spark://@(state.uri)</li> - <li><strong>Number of Workers:</strong> @state.workers.size </li> - <li><strong>Cores:</strong> @state.workers.map(_.cores).sum Total, @state.workers.map(_.coresUsed).sum Used</li> - <li><strong>Memory:</strong> @state.workers.map(_.memory).sum Total, @state.workers.map(_.memoryUsed).sum Used</li> + <li><strong>Workers:</strong> @state.workers.size </li> + <li><strong>Cores:</strong> @{state.workers.map(_.cores).sum} Total, + @{state.workers.map(_.coresUsed).sum} Used</li> + <li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total, + @{Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li> <li><strong>Jobs:</strong> @state.activeJobs.size Running, @state.completedJobs.size Completed </li> </ul> </div> @@ -21,7 +24,7 @@ <div class="span12"> <h3> Cluster Summary </h3> <br/> - @worker_table(state.workers) + @worker_table(state.workers.sortBy(_.id)) </div> </div> @@ -32,7 +35,7 @@ <div class="span12"> <h3> Running Jobs </h3> <br/> - @job_table(state.activeJobs) + @job_table(state.activeJobs.sortBy(_.startTime).reverse) </div> </div> @@ -43,7 +46,7 @@ <div class="span12"> <h3> Completed Jobs </h3> <br/> - @job_table(state.completedJobs) + @job_table(state.completedJobs.sortBy(_.endTime).reverse) </div> </div> diff --git a/core/src/main/twirl/spark/deploy/master/job_row.scala.html b/core/src/main/twirl/spark/deploy/master/job_row.scala.html index 7c4865bb6ef4e0c9c856b88b7568f143cde2ebe3..fff7953e7d0e0e3498c9cb0e9a40ae64b62415ac 100644 --- a/core/src/main/twirl/spark/deploy/master/job_row.scala.html +++ b/core/src/main/twirl/spark/deploy/master/job_row.scala.html @@ -1,5 +1,9 @@ @(job: spark.deploy.master.JobInfo) +@import spark.Utils +@import spark.deploy.WebUI.formatDate +@import spark.deploy.WebUI.formatDuration + <tr> <td> <a href="job?jobId=@(job.id)">@job.id</a> @@ -13,8 +17,9 @@ , @job.coresLeft } </td> - <td>@job.desc.memoryPerSlave</td> - <td>@job.submitDate</td> + <td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td> + <td>@formatDate(job.submitDate)</td> <td>@job.desc.user</td> <td>@job.state.toString()</td> -</tr> \ No newline at end of file + <td>@formatDuration(job.duration)</td> +</tr> diff --git a/core/src/main/twirl/spark/deploy/master/job_table.scala.html b/core/src/main/twirl/spark/deploy/master/job_table.scala.html index 52bad6c4b82bdc4849b13e5c835c8f415d87b1c3..d267d6e85e0b7acb73a120c81fe71692e59d7dce 100644 --- a/core/src/main/twirl/spark/deploy/master/job_table.scala.html +++ b/core/src/main/twirl/spark/deploy/master/job_table.scala.html @@ -1,4 +1,4 @@ -@(jobs: List[spark.deploy.master.JobInfo]) +@(jobs: Array[spark.deploy.master.JobInfo]) <table class="table table-bordered table-striped table-condensed sortable"> <thead> @@ -6,10 +6,11 @@ <th>JobID</th> <th>Description</th> <th>Cores</th> - <th>Memory per Slave</th> - <th>Submit Date</th> + <th>Memory per Node</th> + <th>Submit Time</th> <th>User</th> <th>State</th> + <th>Duration</th> </tr> </thead> <tbody> @@ -17,4 +18,4 @@ @job_row(j) } </tbody> -</table> \ No newline at end of file +</table> diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html index 017cc4859e0e80a1f59c3002bf0d810d77c5fd9b..3dcba3a5457f1d53d10a80ca6b80f7d8ffdea59f 100644 --- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html +++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html @@ -1,11 +1,13 @@ @(worker: spark.deploy.master.WorkerInfo) +@import spark.Utils + <tr> <td> <a href="http://@worker.host:@worker.webUiPort">@worker.id</href> </td> <td>@{worker.host}:@{worker.port}</td> <td>@worker.cores (@worker.coresUsed Used)</td> - <td>@{spark.Utils.memoryMegabytesToString(worker.memory)} - (@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td> + <td>@{Utils.memoryMegabytesToString(worker.memory)} + (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td> </tr> diff --git a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html index 2028842297f71c0b5294f5b19cf4e642c75d2dc2..fad1af41dc0619cb9baeed01d9d5a8e7f3f204ac 100644 --- a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html +++ b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html @@ -1,4 +1,4 @@ -@(workers: List[spark.deploy.master.WorkerInfo]) +@(workers: Array[spark.deploy.master.WorkerInfo]) <table class="table table-bordered table-striped table-condensed sortable"> <thead> @@ -14,4 +14,4 @@ @worker_row(w) } </tbody> -</table> \ No newline at end of file +</table> diff --git a/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html b/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html index c3842dbf858b5ffefbfc080a2a7a68eae5ef2e01..ea9542461e5f736b41829f9bea8c07521dcc79f9 100644 --- a/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html +++ b/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html @@ -1,20 +1,20 @@ @(executor: spark.deploy.worker.ExecutorRunner) +@import spark.Utils + <tr> <td>@executor.execId</td> <td>@executor.cores</td> - <td>@executor.memory</td> + <td>@Utils.memoryMegabytesToString(executor.memory)</td> <td> <ul class="unstyled"> <li><strong>ID:</strong> @executor.jobId</li> <li><strong>Name:</strong> @executor.jobDesc.name</li> <li><strong>User:</strong> @executor.jobDesc.user</li> - <li><strong>Cores:</strong> @executor.jobDesc.cores </li> - <li><strong>Memory per Slave:</strong> @executor.jobDesc.memoryPerSlave</li> </ul> </td> <td> <a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stdout">stdout</a> <a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stderr">stderr</a> </td> -</tr> \ No newline at end of file +</tr> diff --git a/core/src/main/twirl/spark/deploy/worker/index.scala.html b/core/src/main/twirl/spark/deploy/worker/index.scala.html index 69746ed02cfed598308b745d2f98781087723852..b247307dab06793be7877bee00ffd1255bd4da8c 100644 --- a/core/src/main/twirl/spark/deploy/worker/index.scala.html +++ b/core/src/main/twirl/spark/deploy/worker/index.scala.html @@ -1,5 +1,7 @@ @(worker: spark.deploy.WorkerState) +@import spark.Utils + @spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) { <!-- Worker Details --> @@ -12,8 +14,8 @@ (WebUI at <a href="@worker.masterWebUiUrl">@worker.masterWebUiUrl</a>) </li> <li><strong>Cores:</strong> @worker.cores (@worker.coresUsed Used)</li> - <li><strong>Memory:</strong> @{spark.Utils.memoryMegabytesToString(worker.memory)} - (@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li> + <li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(worker.memory)} + (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li> </ul> </div> </div>