diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d49401f2fe46689c863edc2b54364ff7c2035498..380055728f9793ef9a334b8297c45a4a962d89cf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -176,10 +176,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerWebUiPort, publicAddress) - registerWorker(worker) - persistenceEngine.addWorker(worker) - sender ! RegisteredWorker(masterUrl, masterWebUiUrl) - schedule() + if (registerWorker(worker)) { + persistenceEngine.addWorker(worker) + sender ! RegisteredWorker(masterUrl, masterWebUiUrl) + schedule() + } else { + val workerAddress = worker.actor.path.address + logWarning("Worker registration failed. Attempted to re-register worker at same address: " + + workerAddress) + sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress) + } } } @@ -511,7 +517,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) } - def registerWorker(worker: WorkerInfo): Unit = { + def registerWorker(worker: WorkerInfo): Boolean = { // There may be one or more refs to dead workers on this same node (w/ different ID's), // remove them. workers.filter { w => @@ -523,13 +529,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val workerAddress = worker.actor.path.address if (addressToWorker.contains(workerAddress)) { logInfo("Attempted to re-register worker at same address: " + workerAddress) - return + return false } workers += worker idToWorker(worker.id) = worker actorToWorker(worker.actor) = worker addressToWorker(workerAddress) = worker + true } def removeWorker(worker: WorkerInfo) {