Skip to content
Snippets Groups Projects
Commit 0ff38c22 authored by Erik Selin's avatar Erik Selin Committed by Patrick Wendell
Browse files

Merge pull request #494 from tyro89/worker_registration_issue

Issue with failed worker registrations

I've been going through the spark source after having some odd issues with workers dying and not coming back. After some digging (I'm very new to scala and spark) I believe I've found a worker registration issue. It looks to me like a failed registration follows the same code path as a successful registration which end up with workers believing they are connected (since they received a `RegisteredWorker` event) even tho they are not registered on the Master.

This is a quick fix that I hope addresses this issue (assuming I didn't completely miss-read the code and I'm about to look like a silly person :P)

I'm opening this pr now to start a chat with you guys while I do some more testing on my side :)

Author: Erik Selin <erik.selin@jadedpixel.com>

== Merge branch commits ==

commit 973012f8a2dcf1ac1e68a69a2086a1b9a50f401b
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Tue Jan 28 23:36:12 2014 -0500

    break logwarning into two lines to respect line character limit.

commit e3754dc5b94730f37e9806974340e6dd93400f85
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Tue Jan 28 21:16:21 2014 -0500

    add log warning when worker registration fails due to attempt to re-register on same address.

commit 14baca241fa7823e1213cfc12a3ff2a9b865b1ed
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Wed Jan 22 21:23:26 2014 -0500

    address code style comment

commit 71c0d7e6f59cd378d4e24994c21140ab893954ee
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Wed Jan 22 16:01:42 2014 -0500

    Make a failed registration not persist, not send a `RegisteredWordker` event and not run `schedule` but rather send a `RegisterWorkerFailed` message to the worker attempting to register.
parent 79302096
No related branches found
No related tags found
No related merge requests found
......@@ -176,10 +176,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerWebUiPort, publicAddress)
registerWorker(worker)
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
} else {
val workerAddress = worker.actor.path.address
logWarning("Worker registration failed. Attempted to re-register worker at same address: " +
workerAddress)
sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)
}
}
}
......@@ -511,7 +517,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
def registerWorker(worker: WorkerInfo): Unit = {
def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
......@@ -523,13 +529,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val workerAddress = worker.actor.path.address
if (addressToWorker.contains(workerAddress)) {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return
return false
}
workers += worker
idToWorker(worker.id) = worker
actorToWorker(worker.actor) = worker
addressToWorker(workerAddress) = worker
true
}
def removeWorker(worker: WorkerInfo) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment