diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index b67e068844db79c19eb82c8bc72e454fc1214e30..69038844bbe4324d3f28cb1a902c99a7c147b087 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -27,10 +27,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} -import akka.remote.RemoteClientShutdown +import akka.remote._ import akka.actor.Terminated -import akka.remote.RemoteClientDisconnected import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -55,19 +53,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte // This actor just working as a monitor to watch on Driver Actor. class MonitorActor(driverUrl: String) extends Actor { - var driver: ActorRef = null + var driver: ActorSelection = null override def preStart() { logInfo("Listen to driver: " + driverUrl) - driver = context.actorFor(driverUrl) + driver = context.actorSelection(driverUrl) driver ! "hello" - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(driver) // Doesn't work with remote actors, but useful for testing + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } override def receive = { - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => - logInfo("Driver terminated or disconnected! Shutting down.") + case x: DisassociatedEvent => + logInfo(s"Driver terminated or disconnected! Shutting down. $x") driverClosed = true } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index baa030b4a4df68409018fd66c9f452973a5dfa51..a6ce1b60a75519ba3e2cb7b5fec55956b05db1c1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.conf.Configuration import java.util.{Collections, Set => JSet} import java.lang.{Boolean => JBoolean} -object AllocationType extends Enumeration ("HOST", "RACK", "ANY") { +object AllocationType extends Enumeration { type AllocationType = Value val HOST, RACK, ANY = Value } @@ -370,7 +370,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY) val containerRequests: ArrayBuffer[ResourceRequest] = - new ArrayBuffer[ResourceRequest](hostContainerRequests.size() + rackContainerRequests.size() + 1) + new ArrayBuffer[ResourceRequest](hostContainerRequests.size + rackContainerRequests.size + 1) containerRequests ++= hostContainerRequests containerRequests ++= rackContainerRequests