From 09e8be9a6225203337a01e618851e807a1482603 Mon Sep 17 00:00:00 2001
From: Prashant Sharma <prashant.s@imaginea.com>
Date: Tue, 3 Dec 2013 11:27:45 +0530
Subject: [PATCH] Made running SparkActorSystem specific to executors only.

---
 .../spark/executor/CoarseGrainedExecutorBackend.scala |  3 ++-
 .../main/scala/org/apache/spark/util/AkkaUtils.scala  | 11 +++++++++--
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index dcb12bed4e..406e015f08 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -97,7 +97,8 @@ private[spark] object CoarseGrainedExecutorBackend {
 
     // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
     // before getting started with all our system properties, etc
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
+      useSparkAS = true)
     // set it
     val sparkHostPort = hostname + ":" + boundPort
     System.setProperty("spark.hostPort", sparkHostPort)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 407e9ffe90..f3e2644a58 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -35,7 +35,9 @@ private[spark] object AkkaUtils {
    * Note: the `name` parameter is important, as even if a client sends a message to right
    * host + port, if the system name is incorrect, Akka will drop the message.
    */
-  def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
+  def createActorSystem(name: String, host: String, port: Int,
+    useSparkAS: Boolean = false): (ActorSystem, Int) = {
+
     val akkaThreads   = System.getProperty("spark.akka.threads", "4").toInt
     val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
 
@@ -70,7 +72,12 @@ private[spark] object AkkaUtils {
       |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
       """.stripMargin)
 
-    val actorSystem = SparkActorSystem(name, akkaConf)
+    val actorSystem = if (useSparkAS) {
+      SparkActorSystem(name, akkaConf)
+    }
+    else {
+      ActorSystem(name, akkaConf)
+    }
 
     val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
     val boundPort = provider.getDefaultAddress.port.get
-- 
GitLab