diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index b30c8e99b560e66de457605056e15cf01cfdcc0b..6ecebe626a7cef9d7be27fef71c84a6f3f76c8c7 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -156,7 +156,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor if (spreadOutJobs) { // Try to spread out each job among all the nodes, until it has all its cores for (job <- waitingJobs if job.coresLeft > 0) { - val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse + val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) + .filter(canUse(job, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum) @@ -203,6 +204,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int, publicAddress: String): WorkerInfo = { + // There may be one or more refs to dead workers on this same node (w/ different ID's), remove them. + workers.filter(w => (w.host == host) && (w.state == WorkerState.DEAD)).foreach(workers -= _) val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) workers += worker idToWorker(worker.id) = worker @@ -213,7 +216,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor def removeWorker(worker: WorkerInfo) { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) - workers -= worker + worker.setState(WorkerState.DEAD) idToWorker -= worker.id actorToWorker -= worker.actor addressToWorker -= worker.actor.path.address diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala index a0a698ef04339cbc1184de70175b81c7140d276c..5a7f5fef8a546812d20719af8d2a0f3dcab1af29 100644 --- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala @@ -14,7 +14,7 @@ private[spark] class WorkerInfo( val publicAddress: String) { var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info - + var state: WorkerState.Value = WorkerState.ALIVE var coresUsed = 0 var memoryUsed = 0 @@ -42,4 +42,8 @@ private[spark] class WorkerInfo( def webUiAddress : String = { "http://" + this.publicAddress + ":" + this.webUiPort } + + def setState(state: WorkerState.Value) = { + this.state = state + } } diff --git a/core/src/main/scala/spark/deploy/master/WorkerState.scala b/core/src/main/scala/spark/deploy/master/WorkerState.scala new file mode 100644 index 0000000000000000000000000000000000000000..0bf35014c806ccc9f61a135b97a5cfc6e791db42 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/WorkerState.scala @@ -0,0 +1,7 @@ +package spark.deploy.master + +private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") { + type WorkerState = Value + + val ALIVE, DEAD, DECOMMISSIONED = Value +} diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html index c32ab30401c96123b5b06356d86d3df8ec6e9a68..be69e9bf028b0678d7f6acdaa8cb3b131cee4575 100644 --- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html +++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html @@ -7,6 +7,7 @@ <a href="@worker.webUiAddress">@worker.id</href> </td> <td>@{worker.host}:@{worker.port}</td> + <td>@worker.state</td> <td>@worker.cores (@worker.coresUsed Used)</td> <td>@{Utils.memoryMegabytesToString(worker.memory)} (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td> diff --git a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html index fad1af41dc0619cb9baeed01d9d5a8e7f3f204ac..b249411a62551a76ae5b159c88081e963cca66b6 100644 --- a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html +++ b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html @@ -5,6 +5,7 @@ <tr> <th>ID</th> <th>Address</th> + <th>State</th> <th>Cores</th> <th>Memory</th> </tr>