Skip to content
Snippets Groups Projects
Commit fbfe69de authored by Mark Hamstra's avatar Mark Hamstra Committed by Matei Zaharia
Browse files

[SPARK-1685] Cancel retryTimer on restart of Worker or AppClient

See https://issues.apache.org/jira/browse/SPARK-1685 for a more complete description, but in essence: If the Worker or AppClient actor restarts before successfully registering with Master, multiple retryTimers will be running, which will lead to less than the full number of registration retries being attempted before the new actor is forced to give up.

Author: Mark Hamstra <markhamstra@gmail.com>

Closes #602 from markhamstra/SPARK-1685 and squashes the following commits:

11cc088 [Mark Hamstra] retryTimer -> registrationRetryTimer
69c348c [Mark Hamstra] Cancel retryTimer on restart of Worker or AppClient
parent 7b978c1a
No related branches found
No related tags found
No related merge requests found
...@@ -60,6 +60,7 @@ private[spark] class AppClient( ...@@ -60,6 +60,7 @@ private[spark] class AppClient(
var master: ActorSelection = null var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times var alreadyDead = false // To avoid calling listener.dead() multiple times
var registrationRetryTimer: Option[Cancellable] = None
override def preStart() { override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
...@@ -83,21 +84,20 @@ private[spark] class AppClient( ...@@ -83,21 +84,20 @@ private[spark] class AppClient(
def registerWithMaster() { def registerWithMaster() {
tryRegisterAllMasters() tryRegisterAllMasters()
import context.dispatcher import context.dispatcher
var retries = 0 var retries = 0
lazy val retryTimer: Cancellable = registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1 retries += 1
if (registered) { if (registered) {
retryTimer.cancel() registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) { } else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.") markDead("All masters are unresponsive! Giving up.")
} else { } else {
tryRegisterAllMasters() tryRegisterAllMasters()
} }
} }
retryTimer // start timer }
} }
def changeMaster(url: String) { def changeMaster(url: String) {
...@@ -177,6 +177,11 @@ private[spark] class AppClient( ...@@ -177,6 +177,11 @@ private[spark] class AppClient(
alreadyDead = true alreadyDead = true
} }
} }
override def postStop() {
registrationRetryTimer.foreach(_.cancel())
}
} }
def start() { def start() {
......
...@@ -100,6 +100,8 @@ private[spark] class Worker( ...@@ -100,6 +100,8 @@ private[spark] class Worker(
val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr) val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
val workerSource = new WorkerSource(this) val workerSource = new WorkerSource(this)
var registrationRetryTimer: Option[Cancellable] = None
def coresFree: Int = cores - coresUsed def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed def memoryFree: Int = memory - memoryUsed
...@@ -161,13 +163,12 @@ private[spark] class Worker( ...@@ -161,13 +163,12 @@ private[spark] class Worker(
def registerWithMaster() { def registerWithMaster() {
tryRegisterAllMasters() tryRegisterAllMasters()
var retries = 0 var retries = 0
lazy val retryTimer: Cancellable = registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1 retries += 1
if (registered) { if (registered) {
retryTimer.cancel() registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) { } else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.") logError("All masters are unresponsive! Giving up.")
System.exit(1) System.exit(1)
...@@ -175,7 +176,7 @@ private[spark] class Worker( ...@@ -175,7 +176,7 @@ private[spark] class Worker(
tryRegisterAllMasters() tryRegisterAllMasters()
} }
} }
retryTimer // start timer }
} }
override def receive = { override def receive = {
...@@ -344,6 +345,7 @@ private[spark] class Worker( ...@@ -344,6 +345,7 @@ private[spark] class Worker(
} }
override def postStop() { override def postStop() {
registrationRetryTimer.foreach(_.cancel())
executors.values.foreach(_.kill()) executors.values.foreach(_.kill())
drivers.values.foreach(_.kill()) drivers.values.foreach(_.kill())
webUi.stop() webUi.stop()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment