diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index dcb12bed4e66bcddab0ac32243436625945fc51e..debbdd4c444cc0095be31a33859fe47326bd160e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -97,7 +97,8 @@ private[spark] object CoarseGrainedExecutorBackend { // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, + indestructible = true) // set it val sparkHostPort = hostname + ":" + boundPort System.setProperty("spark.hostPort", sparkHostPort) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index e000531a26f7e27a63ab7ac7cb301923ba643115..e8fecec4a64f30cb2e78d49ced075f4a30dd28d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -36,7 +36,7 @@ private[spark] class SimrSchedulerBackend( override def start() { super.start() - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index b4451fc7b8e5666a9c93c8e89d27ebb08b224543..df33f6bfb0bb198696a157ea9a6cbee10fdc9da2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -44,7 +44,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) { * Flush the partial writes and commit them as a single atomic block. Return the * number of bytes written for this commit. */ - def commit(): Long + def commit(): LongSpark /** * Reverts writes that haven't been flushed yet. Callers should invoke this function diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 407e9ffe90c5c76b8bb6855a36523444e9ca0ba6..9f3f1632776b7a5cefbd0065435a2a5e9688189f 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import akka.actor.{SparkActorSystem, ActorSystem, ExtendedActorSystem} +import akka.actor.{IndestructibleActorSystem, ActorSystem, ExtendedActorSystem} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.Await @@ -34,8 +34,13 @@ private[spark] object AkkaUtils { * * Note: the `name` parameter is important, as even if a client sends a message to right * host + port, if the system name is incorrect, Akka will drop the message. + * + * If indestructible is set to true, the Actor System will continue running in the event + * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ - def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { + def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false) + : (ActorSystem, Int) = { + val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt @@ -70,7 +75,11 @@ private[spark] object AkkaUtils { |akka.remote.log-remote-lifecycle-events = $lifecycleEvents """.stripMargin) - val actorSystem = SparkActorSystem(name, akkaConf) + val actorSystem = if (indestructible) { + IndestructibleActorSystem(name, akkaConf) + } else { + ActorSystem(name, akkaConf) + } val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get diff --git a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala similarity index 72% rename from core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala rename to core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala index a679fd61428f92a7d3399d9a30b3b8b54d44076b..69519860c6afabe8533fbaa6cfe0471e94e52c78 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala +++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala @@ -10,20 +10,19 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.typesafe.config.Config /** - * An ActorSystem specific to Spark. Based off of [[akka.actor.ActorSystem]]. - * The only change from the default system is that we do not shut down the ActorSystem - * in the event of a fatal exception. This is necessary as Spark is allowed to recover - * from fatal exceptions (see [[org.apache.spark.executor.Executor]]). + * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception. + * This is necessary as Spark Executors are allowed to recover from fatal exceptions + * (see [[org.apache.spark.executor.Executor]]). */ -object SparkActorSystem { +object IndestructibleActorSystem { def apply(name: String, config: Config): ActorSystem = apply(name, config, ActorSystem.findClassLoader()) def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = - new SparkActorSystemImpl(name, config, classLoader).start() + new IndestructibleActorSystemImpl(name, config, classLoader).start() } -private[akka] class SparkActorSystemImpl( +private[akka] class IndestructibleActorSystemImpl( override val name: String, applicationConfig: Config, classLoader: ClassLoader) @@ -36,7 +35,7 @@ private[akka] class SparkActorSystemImpl( def uncaughtException(thread: Thread, cause: Throwable): Unit = { if (isFatalError(cause) && !settings.JvmExitOnFatalError) { log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " + - "ActorSystem tolerating and continuing.... [{}]", thread.getName, name) + "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name) //shutdown() //TODO make it configurable } else { fallbackHandler.uncaughtException(thread, cause) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 421a83c87afdf156b19bee8a006f0dde11fa4c2a..b67e068844db79c19eb82c8bc72e454fc1214e30 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -168,7 +168,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte System.setProperty("spark.driver.host", driverHost) System.setProperty("spark.driver.port", driverPort.toString) - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")