From 0ff38c22205f14770ecca1e66378e7c207ca2d1d Mon Sep 17 00:00:00 2001
From: Erik Selin <erik.selin@jadedpixel.com>
Date: Wed, 29 Jan 2014 12:44:54 -0800
Subject: [PATCH] Merge pull request #494 from tyro89/worker_registration_issue

Issue with failed worker registrations

I've been going through the spark source after having some odd issues with workers dying and not coming back. After some digging (I'm very new to scala and spark) I believe I've found a worker registration issue. It looks to me like a failed registration follows the same code path as a successful registration which end up with workers believing they are connected (since they received a `RegisteredWorker` event) even tho they are not registered on the Master.

This is a quick fix that I hope addresses this issue (assuming I didn't completely miss-read the code and I'm about to look like a silly person :P)

I'm opening this pr now to start a chat with you guys while I do some more testing on my side :)

Author: Erik Selin <erik.selin@jadedpixel.com>

== Merge branch commits ==

commit 973012f8a2dcf1ac1e68a69a2086a1b9a50f401b
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Tue Jan 28 23:36:12 2014 -0500

    break logwarning into two lines to respect line character limit.

commit e3754dc5b94730f37e9806974340e6dd93400f85
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Tue Jan 28 21:16:21 2014 -0500

    add log warning when worker registration fails due to attempt to re-register on same address.

commit 14baca241fa7823e1213cfc12a3ff2a9b865b1ed
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Wed Jan 22 21:23:26 2014 -0500

    address code style comment

commit 71c0d7e6f59cd378d4e24994c21140ab893954ee
Author: Erik Selin <erik.selin@jadedpixel.com>
Date:   Wed Jan 22 16:01:42 2014 -0500

    Make a failed registration not persist, not send a `RegisteredWordker` event and not run `schedule` but rather send a `RegisterWorkerFailed` message to the worker attempting to register.
---
 .../apache/spark/deploy/master/Master.scala   | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d49401f2fe..380055728f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -176,10 +176,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       } else {
         val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
           sender, workerWebUiPort, publicAddress)
-        registerWorker(worker)
-        persistenceEngine.addWorker(worker)
-        sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
-        schedule()
+        if (registerWorker(worker)) {
+          persistenceEngine.addWorker(worker)
+          sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
+          schedule()
+        } else {
+          val workerAddress = worker.actor.path.address
+          logWarning("Worker registration failed. Attempted to re-register worker at same address: " +
+            workerAddress)
+          sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)
+        }
       }
     }
 
@@ -511,7 +517,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
   }
 
-  def registerWorker(worker: WorkerInfo): Unit = {
+  def registerWorker(worker: WorkerInfo): Boolean = {
     // There may be one or more refs to dead workers on this same node (w/ different ID's),
     // remove them.
     workers.filter { w =>
@@ -523,13 +529,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     val workerAddress = worker.actor.path.address
     if (addressToWorker.contains(workerAddress)) {
       logInfo("Attempted to re-register worker at same address: " + workerAddress)
-      return
+      return false
     }
 
     workers += worker
     idToWorker(worker.id) = worker
     actorToWorker(worker.actor) = worker
     addressToWorker(workerAddress) = worker
+    true
   }
 
   def removeWorker(worker: WorkerInfo) {
-- 
GitLab