diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 070f10f729f8059d2f1a125beb3f2162299e95a3..408692ec9c495eb6135c562608ca1e660505e8da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -23,7 +23,6 @@ import scala.concurrent.duration._ import scala.concurrent.Await import akka.actor._ -import akka.actor.Terminated import akka.pattern.AskTimeoutException import akka.pattern.ask import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent} @@ -62,6 +61,7 @@ private[spark] class Client( var alreadyDead = false // To avoid calling listener.dead() multiple times override def preStart() { + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) try { registerWithMaster() } catch { @@ -107,7 +107,6 @@ private[spark] class Client( override def receive = { case RegisteredApplication(appId_, masterUrl) => - context.watch(sender) prevMaster = sender appId = appId_ registered = true @@ -123,7 +122,7 @@ private[spark] class Client( val fullId = appId + "/" + id logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores)) listener.executorAdded(fullId, workerId, hostPort, cores, memory) - + case ExecutorUpdated(id, state, message, exitStatus) => val fullId = appId + "/" + id val messageText = message.map(s => " (" + s + ")").getOrElse("") @@ -134,13 +133,12 @@ private[spark] class Client( case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) - context.unwatch(prevMaster) changeMaster(masterUrl) alreadyDisconnected = false sender ! MasterChangeAcknowledged(appId) - case Terminated(actor_) => - logWarning(s"Connection to $actor_ failed; waiting for master to reconnect...") + case DisassociatedEvent(_, address, _) => + logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() case StopClient => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 25f59271280e76297fa7cfc0179fbb9e4c6a56dc..81fb5c4e43255fa4fb976d76f624a078cb01193f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -147,9 +147,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act RecoveryState.ALIVE else RecoveryState.RECOVERING - logInfo("I have been elected leader! New state: " + state) - if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedWorkers) context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } @@ -171,7 +169,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } else { val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) registerWorker(worker) - context.watch(sender) persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() @@ -186,7 +183,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val app = createApplication(description, sender) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) - context.watch(sender) persistenceEngine.addApplication(app) sender ! RegisteredApplication(app.id, masterUrl) schedule() @@ -262,15 +258,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (canCompleteRecovery) { completeRecovery() } } - case Terminated(actor) => { - // The disconnected actor could've been either a worker or an app; remove whichever of - // those we have an entry for in the corresponding actor hashmap - logInfo(s"$actor got terminated, removing it.") - actorToWorker.get(actor).foreach(removeWorker) - actorToApp.get(actor).foreach(finishApplication) - if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } - } - case DisassociatedEvent(_, address, _) => { // The disconnected client could've been either a worker or an app; remove whichever it was logInfo(s"$address got disassociated, removing it.") @@ -438,8 +425,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act exec.id, ExecutorState.LOST, Some("worker lost"), None) exec.application.removeExecutor(exec) } - context.stop(worker.actor) - context.unwatch(worker.actor) persistenceEngine.removeWorker(worker) } @@ -502,8 +487,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act app.driver ! ApplicationRemoved(state.toString) } persistenceEngine.removeApplication(app) - context.stop(app.driver) - context.unwatch(app.driver) schedule() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 3a7d0b859b297a1e9e3a23d915c056fdffd9d3c8..0a183afd8e2d75d71b738d63411b241cc9ac686d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -73,7 +73,6 @@ private[spark] class Worker( val masterLock: Object = new Object() var master: ActorSelection = null - var prevMaster: ActorRef = null var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" @volatile var registered = false @@ -173,8 +172,6 @@ private[spark] class Worker( case RegisteredWorker(masterUrl, masterWebUiUrl) => logInfo("Successfully registered with master " + masterUrl) registered = true - context.watch(sender) // remote death watch for master - prevMaster = sender changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) @@ -185,8 +182,6 @@ private[spark] class Worker( case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) - context.unwatch(prevMaster) - prevMaster = sender changeMaster(masterUrl, masterWebUiUrl) val execs = executors.values. @@ -245,10 +240,6 @@ private[spark] class Worker( } } - case Terminated(actor_) => - logInfo(s"$actor_ terminated !") - masterDisconnected() - case x: DisassociatedEvent => logInfo(s"$x Disassociated !") masterDisconnected() diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 2818a775d0ebf6bc2ddd5935d58f38fdab3542ea..dcb12bed4e66bcddab0ac32243436625945fc51e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -51,7 +51,6 @@ private[spark] class CoarseGrainedExecutorBackend( override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") - context.watch(sender) //Start watching for terminated messages. // Make this host instead of hostPort ? executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) @@ -76,10 +75,6 @@ private[spark] class CoarseGrainedExecutorBackend( executor.killTask(taskId) } - case Terminated(actor) => - logError(s"Driver $actor terminated, Shutting down.") - System.exit(1) - case x: DisassociatedEvent => logError(s"Driver $x disassociated! Shutting down.") System.exit(1) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e316f6b41f73656bb8f28807282a93ffb47bd11d..d614dcbdd829133c13268e9bcd0ae2b69a6d4dde 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -73,7 +73,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac } else { logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) - context.watch(sender) executorActor(executorId) = sender executorHost(executorId) = Utils.parseHostPort(hostPort)._1 freeCores(executorId) = cores @@ -118,9 +117,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac removeExecutor(executorId, reason) sender ! true - case Terminated(actor) => - actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated")) - case DisassociatedEvent(_, address, _) => addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 90a5387b2b28bcc80227134ed8ac344c28caf11c..23e9b735f3350de06ecef2a16d7a0412c58239bf 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -46,20 +46,15 @@ private[spark] object AkkaUtils { val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble - // Since we have our own Heart Beat mechanism and TCP already tracks connections. - // Using this makes very little sense. So setting this to a relatively larger value suffices. - val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt + val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "5").toInt val akkaConf = ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] |akka.stdout-loglevel = "ERROR" - |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s - |akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s - |akka.remote.watch-failure-detector.threshold = $akkaFailureDetector - |akka.remote.transport-failure-detector.heartbeat-interval = 30 s - |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = ${akkaHeartBeatPauses + 10} s + |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s + |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" diff --git a/spark-class b/spark-class index 78d6e073b1ef84c0b63b03ea55fec1bd4fc81bcb..713404d07755758ea327f16106e1a07cba599cb3 100755 --- a/spark-class +++ b/spark-class @@ -136,3 +136,4 @@ fi exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" +