Skip to content
Snippets Groups Projects
Commit a14807e8 authored by Andrew Or's avatar Andrew Or Committed by Aaron Davidson
Browse files

[SPARK-2147 / 2161] Show removed executors on the UI

This PR includes two changes
- **[SPARK-2147]** When an application finishes cleanly (i.e. `sc.stop()` is called), all of its executors used to disappear from the Master UI. This no longer happens.
- **[SPARK-2161]** This adds a "Removed Executors" table to Master UI, so the user can find out why their executors died from the logs, for instance. The equivalent table already existed in the Worker UI, but was hidden because of a bug (the comment `//scalastyle:off` disconnected the `Seq[Node]` that represents the HTML for table).

This should go into 1.0.1 if possible.

Author: Andrew Or <andrewor14@gmail.com>

Closes #1102 from andrewor14/remember-removed-executors and squashes the following commits:

2e2298f [Andrew Or] Add hash code method to ExecutorInfo (minor)
abd72e0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into remember-removed-executors
792f992 [Andrew Or] Add missing equals method in ExecutorInfo
3390b49 [Andrew Or] Add executor state column to WorkerPage
161f8a2 [Andrew Or] Display finished executors table (fix bug)
fbb65b8 [Andrew Or] Removed unused method
c89bb6e [Andrew Or] Add table for removed executors in MasterWebUI
fe47402 [Andrew Or] Show exited executors on the Master UI
parent 443f5e1b
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master ...@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import java.util.Date import java.util.Date
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorRef import akka.actor.ActorRef
...@@ -36,6 +37,7 @@ private[spark] class ApplicationInfo( ...@@ -36,6 +37,7 @@ private[spark] class ApplicationInfo(
@transient var state: ApplicationState.Value = _ @transient var state: ApplicationState.Value = _
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _ @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
@transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
@transient var coresGranted: Int = _ @transient var coresGranted: Int = _
@transient var endTime: Long = _ @transient var endTime: Long = _
@transient var appSource: ApplicationSource = _ @transient var appSource: ApplicationSource = _
...@@ -51,6 +53,7 @@ private[spark] class ApplicationInfo( ...@@ -51,6 +53,7 @@ private[spark] class ApplicationInfo(
endTime = -1L endTime = -1L
appSource = new ApplicationSource(this) appSource = new ApplicationSource(this)
nextExecutorId = 0 nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorInfo]
} }
private def newExecutorId(useID: Option[Int] = None): Int = { private def newExecutorId(useID: Option[Int] = None): Int = {
...@@ -74,6 +77,7 @@ private[spark] class ApplicationInfo( ...@@ -74,6 +77,7 @@ private[spark] class ApplicationInfo(
def removeExecutor(exec: ExecutorInfo) { def removeExecutor(exec: ExecutorInfo) {
if (executors.contains(exec.id)) { if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id executors -= exec.id
coresGranted -= exec.cores coresGranted -= exec.cores
} }
......
...@@ -34,4 +34,19 @@ private[spark] class ExecutorInfo( ...@@ -34,4 +34,19 @@ private[spark] class ExecutorInfo(
} }
def fullId: String = application.id + "/" + id def fullId: String = application.id + "/" + id
override def equals(other: Any): Boolean = {
other match {
case info: ExecutorInfo =>
fullId == info.fullId &&
worker.id == info.worker.id &&
cores == info.cores &&
memory == info.memory
case _ => false
}
}
override def toString: String = fullId
override def hashCode: Int = toString.hashCode()
} }
...@@ -25,7 +25,7 @@ import scala.xml.Node ...@@ -25,7 +25,7 @@ import scala.xml.Node
import akka.pattern.ask import akka.pattern.ask
import org.json4s.JValue import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.ui.{WebUIPage, UIUtils}
...@@ -57,43 +57,55 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app ...@@ -57,43 +57,55 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
}) })
val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs") val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
val executors = app.executors.values.toSeq val allExecutors = (app.executors.values ++ app.removedExecutors).toSet.toSeq
val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors) // This includes executors that are either still running or have exited cleanly
val executors = allExecutors.filter { exec =>
!ExecutorState.isFinished(exec.state) || exec.state == ExecutorState.EXITED
}
val removedExecutors = allExecutors.diff(executors)
val executorsTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
val removedExecutorsTable = UIUtils.listingTable(executorHeaders, executorRow, removedExecutors)
val content = val content =
<div class="row-fluid"> <div class="row-fluid">
<div class="span12"> <div class="span12">
<ul class="unstyled"> <ul class="unstyled">
<li><strong>ID:</strong> {app.id}</li> <li><strong>ID:</strong> {app.id}</li>
<li><strong>Name:</strong> {app.desc.name}</li> <li><strong>Name:</strong> {app.desc.name}</li>
<li><strong>User:</strong> {app.desc.user}</li> <li><strong>User:</strong> {app.desc.user}</li>
<li><strong>Cores:</strong> <li><strong>Cores:</strong>
{ {
if (app.desc.maxCores.isEmpty) { if (app.desc.maxCores.isEmpty) {
"Unlimited (%s granted)".format(app.coresGranted) "Unlimited (%s granted)".format(app.coresGranted)
} else { } else {
"%s (%s granted, %s left)".format( "%s (%s granted, %s left)".format(
app.desc.maxCores.get, app.coresGranted, app.coresLeft) app.desc.maxCores.get, app.coresGranted, app.coresLeft)
} }
} }
</li> </li>
<li> <li>
<strong>Executor Memory:</strong> <strong>Executor Memory:</strong>
{Utils.megabytesToString(app.desc.memoryPerSlave)} {Utils.megabytesToString(app.desc.memoryPerSlave)}
</li> </li>
<li><strong>Submit Date:</strong> {app.submitDate}</li> <li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li> <li><strong>State:</strong> {app.state}</li>
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li> <li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
</ul> </ul>
</div>
</div> </div>
</div>
<div class="row-fluid"> <!-- Executors --> <div class="row-fluid"> <!-- Executors -->
<div class="span12"> <div class="span12">
<h4> Executor Summary </h4> <h4> Executor Summary </h4>
{executorTable} {executorsTable}
</div> {
</div>; if (removedExecutors.nonEmpty) {
<h4> Removed Executors </h4> ++
removedExecutorsTable
}
}
</div>
</div>;
UIUtils.basicSparkPage(content, "Application: " + app.desc.name) UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
} }
......
...@@ -46,74 +46,62 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { ...@@ -46,74 +46,62 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout) val workerState = Await.result(stateFuture, timeout)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs")
val runningExecutors = workerState.executors
val runningExecutorTable = val runningExecutorTable =
UIUtils.listingTable(executorHeaders, executorRow, workerState.executors) UIUtils.listingTable(executorHeaders, executorRow, runningExecutors)
val finishedExecutors = workerState.finishedExecutors
val finishedExecutorTable = val finishedExecutorTable =
UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) UIUtils.listingTable(executorHeaders, executorRow, finishedExecutors)
val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes") val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes")
val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse
val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers)
val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse
def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) val finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers)
// For now we only show driver information if the user has submitted drivers to the cluster. // For now we only show driver information if the user has submitted drivers to the cluster.
// This is until we integrate the notion of drivers and applications in the UI. // This is until we integrate the notion of drivers and applications in the UI.
def hasDrivers = runningDrivers.length > 0 || finishedDrivers.length > 0
val content = val content =
<div class="row-fluid"> <!-- Worker Details --> <div class="row-fluid"> <!-- Worker Details -->
<div class="span12"> <div class="span12">
<ul class="unstyled"> <ul class="unstyled">
<li><strong>ID:</strong> {workerState.workerId}</li> <li><strong>ID:</strong> {workerState.workerId}</li>
<li><strong> <li><strong>
Master URL:</strong> {workerState.masterUrl} Master URL:</strong> {workerState.masterUrl}
</li> </li>
<li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li> <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
<li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)} <li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)}
({Utils.megabytesToString(workerState.memoryUsed)} Used)</li> ({Utils.megabytesToString(workerState.memoryUsed)} Used)</li>
</ul> </ul>
<p><a href={workerState.masterWebUiUrl}>Back to Master</a></p> <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
</div>
</div> </div>
</div>
<div class="row-fluid"> <!-- Running Executors --> <div class="row-fluid"> <!-- Executors and Drivers -->
<div class="span12"> <div class="span12">
<h4> Running Executors {workerState.executors.size} </h4> <h4> Running Executors ({runningExecutors.size}) </h4>
{runningExecutorTable} {runningExecutorTable}
</div> {
</div> if (runningDrivers.nonEmpty) {
// scalastyle:off <h4> Running Drivers ({runningDrivers.size}) </h4> ++
<div> runningDriverTable
{if (hasDrivers) }
<div class="row-fluid"> <!-- Running Drivers -->
<div class="span12">
<h4> Running Drivers {workerState.drivers.size} </h4>
{runningDriverTable}
</div>
</div>
} }
</div> {
if (finishedExecutors.nonEmpty) {
<div class="row-fluid"> <!-- Finished Executors --> <h4>Finished Executors ({finishedExecutors.size}) </h4> ++
<div class="span12"> finishedExecutorTable
<h4> Finished Executors </h4> }
{finishedExecutorTable}
</div>
</div>
<div>
{if (hasDrivers)
<div class="row-fluid"> <!-- Finished Drivers -->
<div class="span12">
<h4> Finished Drivers </h4>
{finishedDriverTable}
</div>
</div>
} }
</div>; {
// scalastyle:on if (finishedDrivers.nonEmpty) {
<h4> Finished Drivers ({finishedDrivers.size}) </h4> ++
finishedDriverTable
}
}
</div>
</div>;
UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format( UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
workerState.host, workerState.port)) workerState.host, workerState.port))
} }
...@@ -122,6 +110,7 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { ...@@ -122,6 +110,7 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
<tr> <tr>
<td>{executor.execId}</td> <td>{executor.execId}</td>
<td>{executor.cores}</td> <td>{executor.cores}</td>
<td>{executor.state}</td>
<td sorttable_customkey={executor.memory.toString}> <td sorttable_customkey={executor.memory.toString}>
{Utils.megabytesToString(executor.memory)} {Utils.megabytesToString(executor.memory)}
</td> </td>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment