Skip to content
Snippets Groups Projects
Commit e7051767 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #337 from pwendell/worker-liveness-ui

SPARK-616: Logging dead workers in Web UI.
parents 30b47794 bfac06e1
No related branches found
No related tags found
No related merge requests found
...@@ -156,7 +156,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -156,7 +156,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
if (spreadOutJobs) { if (spreadOutJobs) {
// Try to spread out each job among all the nodes, until it has all its cores // Try to spread out each job among all the nodes, until it has all its cores
for (job <- waitingJobs if job.coresLeft > 0) { for (job <- waitingJobs if job.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(job, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum) var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
...@@ -203,6 +204,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -203,6 +204,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int, def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
publicAddress: String): WorkerInfo = { publicAddress: String): WorkerInfo = {
// There may be one or more refs to dead workers on this same node (w/ different ID's), remove them.
workers.filter(w => (w.host == host) && (w.state == WorkerState.DEAD)).foreach(workers -= _)
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker workers += worker
idToWorker(worker.id) = worker idToWorker(worker.id) = worker
...@@ -213,7 +216,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -213,7 +216,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
def removeWorker(worker: WorkerInfo) { def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
workers -= worker worker.setState(WorkerState.DEAD)
idToWorker -= worker.id idToWorker -= worker.id
actorToWorker -= worker.actor actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address addressToWorker -= worker.actor.path.address
......
...@@ -14,7 +14,7 @@ private[spark] class WorkerInfo( ...@@ -14,7 +14,7 @@ private[spark] class WorkerInfo(
val publicAddress: String) { val publicAddress: String) {
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
var state: WorkerState.Value = WorkerState.ALIVE
var coresUsed = 0 var coresUsed = 0
var memoryUsed = 0 var memoryUsed = 0
...@@ -42,4 +42,8 @@ private[spark] class WorkerInfo( ...@@ -42,4 +42,8 @@ private[spark] class WorkerInfo(
def webUiAddress : String = { def webUiAddress : String = {
"http://" + this.publicAddress + ":" + this.webUiPort "http://" + this.publicAddress + ":" + this.webUiPort
} }
def setState(state: WorkerState.Value) = {
this.state = state
}
} }
package spark.deploy.master
private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
type WorkerState = Value
val ALIVE, DEAD, DECOMMISSIONED = Value
}
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
<a href="@worker.webUiAddress">@worker.id</href> <a href="@worker.webUiAddress">@worker.id</href>
</td> </td>
<td>@{worker.host}:@{worker.port}</td> <td>@{worker.host}:@{worker.port}</td>
<td>@worker.state</td>
<td>@worker.cores (@worker.coresUsed Used)</td> <td>@worker.cores (@worker.coresUsed Used)</td>
<td>@{Utils.memoryMegabytesToString(worker.memory)} <td>@{Utils.memoryMegabytesToString(worker.memory)}
(@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td> (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
<tr> <tr>
<th>ID</th> <th>ID</th>
<th>Address</th> <th>Address</th>
<th>State</th>
<th>Cores</th> <th>Cores</th>
<th>Memory</th> <th>Memory</th>
</tr> </tr>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment