From 276c37a51c9a6188dbbe02754935540ace338dd1 Mon Sep 17 00:00:00 2001
From: Prashant Sharma <scrapcodes@gmail.com>
Date: Sun, 22 Sep 2013 08:20:12 +0530
Subject: [PATCH] Akka 2.2 migration

---
 .../scala/org/apache/spark/SparkEnv.scala     |  2 +-
 .../apache/spark/deploy/client/Client.scala   | 10 +++---
 .../apache/spark/deploy/master/Master.scala   | 31 ++++++++++++++-----
 .../apache/spark/deploy/worker/Worker.scala   | 12 +++----
 .../org/apache/spark/executor/Executor.scala  |  2 +-
 .../executor/StandaloneExecutorBackend.scala  | 30 ++++++++++++------
 .../cluster/SparkDeploySchedulerBackend.scala |  2 +-
 .../cluster/StandaloneSchedulerBackend.scala  | 12 +++----
 .../mesos/CoarseMesosSchedulerBackend.scala   |  2 +-
 .../org/apache/spark/util/AkkaUtils.scala     | 23 +++++++-------
 .../org/apache/spark/DistributedSuite.scala   |  2 +-
 .../scala/org/apache/spark/DriverSuite.scala  |  2 +-
 .../apache/spark/MapOutputTrackerSuite.scala  |  2 +-
 .../streaming/examples/ActorWordCount.scala   |  2 +-
 .../streaming/examples/ZeroMQWordCount.scala  |  8 +++--
 project/SparkBuild.scala                      | 18 +++++------
 .../spark/streaming/StreamingContext.scala    |  5 +--
 .../api/java/JavaStreamingContext.scala       |  7 +++--
 .../dstream/NetworkInputDStream.scala         |  2 +-
 .../streaming/receivers/ActorReceiver.scala   |  4 +--
 .../streaming/receivers/ZeroMQReceiver.scala  |  7 +++--
 .../apache/spark/streaming/JavaAPISuite.java  |  2 +-
 .../deploy/yarn/YarnAllocationHandler.scala   |  2 +-
 23 files changed, 109 insertions(+), 80 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 1e63b54b7a..a267407c67 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -169,7 +169,7 @@ object SparkEnv extends Logging {
         val driverHost: String = System.getProperty("spark.driver.host", "localhost")
         val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
         Utils.checkHost(driverHost, "Expected hostname")
-        val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
+        val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
         logInfo("Connecting to " + name + ": " + url)
         actorSystem.actorFor(url)
       }
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 14a90934f6..164386782c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -26,9 +26,7 @@ import akka.actor._
 import akka.actor.Terminated
 import akka.pattern.AskTimeoutException
 import akka.pattern.ask
-import akka.remote.RemoteClientDisconnected
-import akka.remote.RemoteClientLifeCycleEvent
-import akka.remote.RemoteClientShutdown
+import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
 
 import org.apache.spark.Logging
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -61,7 +59,7 @@ private[spark] class Client(
         master = context.actorFor(Master.toAkkaUrl(masterUrl))
         masterAddress = master.path.address
         master ! RegisterApplication(appDescription)
-        context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+        context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
         context.watch(master)  // Doesn't work with remote actors, but useful for testing
       } catch {
         case e: Exception =>
@@ -99,12 +97,12 @@ private[spark] class Client(
         markDisconnected()
         context.stop(self)
 
-      case RemoteClientDisconnected(transport, address) if address == masterAddress =>
+      case DisassociatedEvent(_, address, _) if address == masterAddress =>
         logError("Connection to master failed; stopping client")
         markDisconnected()
         context.stop(self)
 
-      case RemoteClientShutdown(transport, address) if address == masterAddress =>
+      case AssociationErrorEvent(_, _, address, _) if address == masterAddress =>
         logError("Connection to master failed; stopping client")
         markDisconnected()
         context.stop(self)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2efd16bca0..cb0fe6a850 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -25,9 +25,8 @@ import scala.concurrent.Await
 import scala.concurrent.duration._
 
 import akka.actor._
-import akka.actor.Terminated
 import akka.pattern.ask
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.remote._
 
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -36,6 +35,22 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{Utils, AkkaUtils}
 import akka.util.Timeout
+import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
+import org.apache.spark.deploy.DeployMessages.KillExecutor
+import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import scala.Some
+import org.apache.spark.deploy.DeployMessages.WebUIPortResponse
+import org.apache.spark.deploy.DeployMessages.LaunchExecutor
+import org.apache.spark.deploy.DeployMessages.RegisteredApplication
+import org.apache.spark.deploy.DeployMessages.RegisterWorker
+import org.apache.spark.deploy.DeployMessages.ExecutorUpdated
+import org.apache.spark.deploy.DeployMessages.MasterStateResponse
+import org.apache.spark.deploy.DeployMessages.ExecutorAdded
+import org.apache.spark.deploy.DeployMessages.RegisterApplication
+import org.apache.spark.deploy.DeployMessages.ApplicationRemoved
+import org.apache.spark.deploy.DeployMessages.Heartbeat
+import org.apache.spark.deploy.DeployMessages.RegisteredWorker
+import akka.actor.Terminated
 
 
 private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@@ -81,7 +96,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   override def preStart() {
     logInfo("Starting Spark master at spark://" + host + ":" + port)
     // Listen for remote client disconnection events, since they don't go through Akka's watch()
-    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
     webUi.start()
     import context.dispatcher
     context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
@@ -165,13 +180,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       actorToApp.get(actor).foreach(finishApplication)
     }
 
-    case RemoteClientDisconnected(transport, address) => {
+    case DisassociatedEvent(_, address, _) => {
       // The disconnected client could've been either a worker or an app; remove whichever it was
       addressToWorker.get(address).foreach(removeWorker)
       addressToApp.get(address).foreach(finishApplication)
     }
 
-    case RemoteClientShutdown(transport, address) => {
+    case AssociationErrorEvent(_, _, address, _) => {
       // The disconnected client could've been either a worker or an app; remove whichever it was
       addressToWorker.get(address).foreach(removeWorker)
       addressToApp.get(address).foreach(finishApplication)
@@ -376,11 +391,11 @@ private[spark] object Master {
     actorSystem.awaitTermination()
   }
 
-  /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+  /** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
   def toAkkaUrl(sparkUrl: String): String = {
     sparkUrl match {
       case sparkUrlRegex(host, port) =>
-        "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+        "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
       case _ =>
         throw new SparkException("Invalid master URL: " + sparkUrl)
     }
@@ -388,7 +403,7 @@ private[spark] object Master {
 
   def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
-    val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
+    val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
     val timeoutDuration = Duration.create(
       System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
     implicit val timeout = Timeout(timeoutDuration)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a0a9d1040a..1f04c1eea5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,9 +25,9 @@ import scala.collection.mutable.HashMap
 import scala.concurrent.duration._
 
 import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent}
 
-import org.apache.spark.{Logging}
+import org.apache.spark.Logging
 import org.apache.spark.deploy.ExecutorState
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
@@ -113,7 +113,7 @@ private[spark] class Worker(
     logInfo("Connecting to master " + masterUrl)
     master = context.actorFor(Master.toAkkaUrl(masterUrl))
     master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
-    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
     context.watch(master) // Doesn't work with remote actors, but useful for testing
   }
 
@@ -165,7 +165,7 @@ private[spark] class Worker(
           logInfo("Asked to kill unknown executor " + fullId)
       }
 
-    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+    case _: Terminated | DisassociatedEvent | AssociationErrorEvent =>
       masterDisconnected()
 
     case RequestWorkerState => {
@@ -207,8 +207,8 @@ private[spark] object Worker {
     // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
     val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
-    val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
-      masterUrl, workDir)), name = "Worker")
+    actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
+      masterUrl, workDir), name = "Worker")
     (actorSystem, boundPort)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index ceae3b8289..99a4a95e82 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -105,7 +105,7 @@ private[spark] class Executor(
   SparkEnv.set(env)
   env.metricsSystem.registerSource(executorSource)
 
-  private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
+  private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size")
 
   // Start worker thread pool
   val threadPool = new ThreadPoolExecutor(
diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
index 7839023868..46f0ef2cc6 100644
--- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
@@ -19,13 +19,25 @@ package org.apache.spark.executor
 
 import java.nio.ByteBuffer
 
-import akka.actor.{ActorRef, Actor, Props, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.actor._
+import akka.remote._
 
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
 import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisteredExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.LaunchTask
+import akka.remote.DisassociatedEvent
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutorFailed
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisteredExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.LaunchTask
+import akka.remote.AssociationErrorEvent
+import akka.remote.DisassociatedEvent
+import akka.actor.Terminated
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutorFailed
 
 
 private[spark] class StandaloneExecutorBackend(
@@ -40,14 +52,14 @@ private[spark] class StandaloneExecutorBackend(
   Utils.checkHostPort(hostPort, "Expected hostport")
 
   var executor: Executor = null
-  var driver: ActorRef = null
+  var driver: ActorSelection = null
 
   override def preStart() {
     logInfo("Connecting to driver: " + driverUrl)
-    driver = context.actorFor(driverUrl)
+    driver = context.actorSelection(driverUrl)
     driver ! RegisterExecutor(executorId, hostPort, cores)
-    context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
-    context.watch(driver) // Doesn't work with remote actors, but useful for testing
+    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+   // context.watch(driver) // Doesn't work with remote actors, but useful for testing
   }
 
   override def receive = {
@@ -69,7 +81,7 @@ private[spark] class StandaloneExecutorBackend(
         executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
       }
 
-    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+    case _: Terminated | DisassociatedEvent | AssociationErrorEvent =>
       logError("Driver terminated or disconnected! Shutting down.")
       System.exit(1)
   }
@@ -90,8 +102,8 @@ private[spark] object StandaloneExecutorBackend {
     // set it
     val sparkHostPort = hostname + ":" + boundPort
     System.setProperty("spark.hostPort", sparkHostPort)
-    val actor = actorSystem.actorOf(
-      Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
+    actorSystem.actorOf(
+      Props(classOf[StandaloneExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
       name = "Executor")
     actorSystem.awaitTermination()
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c49768c0c..fa83ae19d6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -42,7 +42,7 @@ private[spark] class SparkDeploySchedulerBackend(
     super.start()
 
     // The endpoint for executors to talk to us
-    val driverUrl = "akka://spark@%s:%s/user/%s".format(
+    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
       StandaloneSchedulerBackend.ACTOR_NAME)
     val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index addfa077c1..49f668eb32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -25,7 +25,7 @@ import scala.concurrent.duration._
 
 import akka.actor._
 import akka.pattern.ask
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
 
 import org.apache.spark.{SparkException, Logging, TaskState}
 import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
@@ -53,7 +53,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
 
     override def preStart() {
       // Listen for remote client disconnection events, since they don't go through Akka's watch()
-      context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
       // Periodically revive offers to allow delay scheduling to work
       val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
@@ -101,11 +101,11 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
       case Terminated(actor) =>
         actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
 
-      case RemoteClientDisconnected(transport, address) =>
-        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
+      case DisassociatedEvent(_, remoteAddress, _) =>
+        addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
 
-      case RemoteClientShutdown(transport, address) =>
-        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
+      case AssociationErrorEvent(_, _, remoteAddress, _) =>
+        addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
     }
 
     // Make fake resource offers on all executors
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 3dbe61d706..babe875fa1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -119,7 +119,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     }
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
-    val driverUrl = "akka://spark@%s:%s/user/%s".format(
+    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       System.getProperty("spark.driver.host"),
       System.getProperty("spark.driver.port"),
       StandaloneSchedulerBackend.ACTOR_NAME)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index e674d120ea..af1c36b34d 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -48,28 +48,27 @@ private[spark] object AkkaUtils {
 
     val akkaConf = ConfigFactory.parseString("""
       akka.daemonic = on
-      akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
+      akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
       akka.stdout-loglevel = "ERROR"
       akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-      akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
-      akka.remote.netty.hostname = "%s"
-      akka.remote.netty.port = %d
-      akka.remote.netty.connection-timeout = %ds
-      akka.remote.netty.message-frame-size = %d MiB
-      akka.remote.netty.execution-pool-size = %d
+      akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
+      akka.remote.netty.tcp.hostname = "%s"
+      akka.remote.netty.tcp.port = %d
+      akka.remote.netty.tcp.connection-timeout = %ds
+      akka.remote.netty.tcp.message-frame-size = %d MiB
+      akka.remote.netty.tcp.execution-pool-size = %d
       akka.actor.default-dispatcher.throughput = %d
       akka.remote.log-remote-lifecycle-events = %s
-      akka.remote.netty.write-timeout = %ds
-      """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
-        lifecycleEvents, akkaWriteTimeout))
+                                             """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
+        lifecycleEvents))
 
     val actorSystem = ActorSystem(name, akkaConf)
 
     // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
     // hack because Akka doesn't let you figure out the port through the public API yet.
     val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
-    val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
-    return (actorSystem, boundPort)
+    val boundPort = provider.getDefaultAddress.port.get
+    (actorSystem, boundPort)
   }
 
 }
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 7a856d4081..c719a54a61 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -325,7 +325,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
     // when running under LocalScheduler:
     sc = new SparkContext("local-cluster[1,1,512]", "test")
     val akkaFrameSize =
-      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size").toInt
     val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
     val exception = intercept[SparkException] {
       rdd.reduce((x, y) => x)
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 01a72d8401..6d1695eae7 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -34,7 +34,7 @@ class DriverSuite extends FunSuite with Timeouts {
     // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
     val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
     forAll(masters) { (master: String) =>
-      failAfter(30 seconds) {
+      failAfter(60 seconds) {
         Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master),
           new File(System.getenv("SPARK_HOME")))
       }
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 6013320eaa..18fb1bf590 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -109,7 +109,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
     val slaveTracker = new MapOutputTracker()
     slaveTracker.trackerActor = slaveSystem.actorFor(
-        "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
+        "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker")
 
     masterTracker.registerShuffle(10, 1)
     masterTracker.incrementEpoch()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 13aa24fa1a..08e399f9ee 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -165,7 +165,7 @@ object ActorWordCount {
      */
 
     val lines = ssc.actorStream[String](
-      Props(new SampleActorReceiver[String]("akka://test@%s:%s/user/FeederActor".format(
+      Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
         host, port.toInt))), "SampleReceiver")
 
     //compute wordcount
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index c8743b9e25..e83ce78aa5 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -23,6 +23,7 @@ import akka.zeromq._
 import org.apache.spark.streaming.{ Seconds, StreamingContext }
 import org.apache.spark.streaming.StreamingContext._
 import akka.zeromq.Subscribe
+import akka.util.ByteString
 
 /**
  * A simple publisher for demonstration purposes, repeatedly publishes random Messages
@@ -40,10 +41,11 @@ object SimpleZeroMQPublisher {
     val acs: ActorSystem = ActorSystem()
 
     val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
-    val messages: Array[String] = Array("words ", "may ", "count ")
+    implicit def stringToByteString(x: String) = ByteString(x)
+    val messages: List[ByteString] = List("words ", "may ", "count ")
     while (true) {
       Thread.sleep(1000)
-      pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList)
+      pubSocket ! ZMQMessage(ByteString(topic) :: messages)
     }
     acs.awaitTermination()
   }
@@ -78,7 +80,7 @@ object ZeroMQWordCount {
     val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2),
       System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
 
-    def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
+    def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
 
     //For this stream, a zeroMQ publisher should be running.
     val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 5e7ed81c1e..f18ebf1400 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -81,7 +81,7 @@ object SparkBuild extends Build {
     organization       := "org.apache.spark",
     version            := "0.8.0-SNAPSHOT",
     scalaVersion       := "2.10.2",
-    scalacOptions      := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION),
+//    scalacOptions      := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION),
     javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
     unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
     retrieveManaged := true,
@@ -150,7 +150,7 @@ object SparkBuild extends Build {
 */
 
     libraryDependencies ++= Seq(
-        "io.netty"          % "netty-all"       % "4.0.0.Beta2",
+        "io.netty"          % "netty-all"       % "4.0.0.CR1",
         "org.eclipse.jetty" % "jetty-server"    % "7.6.8.v20121106",
         "org.scalatest"    %% "scalatest"       % "1.9.1"  % "test",
         "org.scalacheck"   %% "scalacheck"      % "1.10.0" % "test",
@@ -183,9 +183,9 @@ object SparkBuild extends Build {
   def coreSettings = sharedSettings ++ Seq(
     name := "spark-core",
     resolvers ++= Seq(
-      "JBoss Repository"     at "http://repository.jboss.org/nexus/content/repositories/releases/",
-      "Spray Repository"     at "http://repo.spray.cc/",
-      "Cloudera Repository"  at "https://repository.cloudera.com/artifactory/cloudera-repos/"
+      // "JBoss Repository"     at "http://repository.jboss.org/nexus/content/repositories/releases/",
+      // "Spray Repository"     at "http://repo.spray.cc/",
+       "Cloudera Repository"  at "https://repository.cloudera.com/artifactory/cloudera-repos/"
     ),
 
     libraryDependencies ++= Seq(
@@ -200,9 +200,9 @@ object SparkBuild extends Build {
         "org.ow2.asm"              % "asm"              % "4.0",
         "com.google.protobuf"      % "protobuf-java"    % "2.4.1",
         "de.javakaffee"            % "kryo-serializers" % "0.22",
-        "com.typesafe.akka"       %% "akka-remote"      % "2.1.4"        excludeAll(excludeNetty),
-        "com.typesafe.akka"       %% "akka-slf4j"       % "2.1.4"        excludeAll(excludeNetty),
-        "net.liftweb"             %% "lift-json"        % "2.5.1",
+        "com.typesafe.akka"       %% "akka-remote"      % "2.2.1"  excludeAll(excludeNetty), 
+        "com.typesafe.akka"       %% "akka-slf4j"       % "2.2.1"  excludeAll(excludeNetty),
+        "net.liftweb"             %% "lift-json"        % "2.5.1"  excludeAll(excludeNetty),
         "it.unimi.dsi"             % "fastutil"         % "6.4.4",
         "colt"                     % "colt"             % "1.2.0",
         "org.apache.mesos"         % "mesos"            % "0.12.1",
@@ -271,7 +271,7 @@ object SparkBuild extends Build {
       "org.apache.flume"      % "flume-ng-sdk"     % "1.2.0" % "compile"  excludeAll(excludeNetty, excludeSnappy),
       "com.github.sgroschupf" % "zkclient"         % "0.1"                excludeAll(excludeNetty),
       "org.twitter4j"         % "twitter4j-stream" % "3.0.3"              excludeAll(excludeNetty),
-      "com.typesafe.akka"    %%  "akka-zeromq"     % "2.1.4"              excludeAll(excludeNetty)
+      "com.typesafe.akka"    %%  "akka-zeromq"     % "2.2.1"              excludeAll(excludeNetty)
     )
   )
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 9e14c8ace7..c722aa15ab 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.apache.hadoop.fs.Path
 import twitter4j.Status
 import twitter4j.auth.Authorization
+import akka.util.ByteString
 
 
 /**
@@ -231,11 +232,11 @@ class StreamingContext private (
   def zeroMQStream[T: ClassTag](
       publisherUrl:String,
       subscribe: Subscribe,
-      bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+      bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
       storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
       supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
     ): DStream[T] = {
-    actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+    actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
         "ZeroMQReceiver", storageLevel, supervisorStrategy)
   }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 8135d2499e..8242af6d5f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -29,6 +29,7 @@ import twitter4j.Status
 import akka.actor.Props
 import akka.actor.SupervisorStrategy
 import akka.zeromq.Subscribe
+import akka.util.ByteString
 
 import twitter4j.auth.Authorization
 
@@ -475,7 +476,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   def zeroMQStream[T](
       publisherUrl:String,
       subscribe: Subscribe,
-      bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+      bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
       storageLevel: StorageLevel,
       supervisorStrategy: SupervisorStrategy
     ): JavaDStream[T] = {
@@ -502,7 +503,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
     ): JavaDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
     ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
   }
 
@@ -522,7 +523,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
     ): JavaDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+    def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
     ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
   }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index a61a1780f1..394a39fbb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -177,7 +177,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
     logInfo("Attempting to register with tracker")
     val ip = System.getProperty("spark.driver.host", "localhost")
     val port = System.getProperty("spark.driver.port", "7077").toInt
-    val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
+    val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
     val tracker = env.actorSystem.actorFor(url)
     val timeout = 5.seconds
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index c220127c00..ee087a1cf0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -51,7 +51,7 @@ object ReceiverSupervisorStrategy {
  * @example {{{
  *  class MyActor extends Actor with Receiver{
  *      def receive {
- *          case anything :String ⇒ pushBlock(anything)
+ *          case anything :String => pushBlock(anything)
  *      }
  *  }
  *  //Can be plugged in actorStream as follows
@@ -121,7 +121,7 @@ private[streaming] class ActorReceiver[T: ClassTag](
   protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
     "Supervisor" + streamId)
 
-  private class Supervisor extends Actor {
+  class Supervisor extends Actor {
 
     override val supervisorStrategy = receiverSupervisorStrategy
     val worker = context.actorOf(props, name)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
index e009325b67..ce8c56fa8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming.receivers
 
 import akka.actor.Actor
+import akka.util.ByteString
 import akka.zeromq._
 
 import org.apache.spark.Logging
@@ -29,7 +30,7 @@ import scala.reflect.ClassTag
  */
 private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
   subscribe: Subscribe,
-  bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
+  bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
   extends Actor with Receiver with Logging {
 
   override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
@@ -40,10 +41,10 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
     case Connecting ⇒ logInfo("connecting ...")
 
     case m: ZMQMessage ⇒
-      logDebug("Received message for:" + m.firstFrameAsString)
+      logDebug("Received message for:" + m.frame(0))
 
       //We ignore first frame for processing as it is the topic
-      val bytes = m.frames.tail.map(_.payload)
+      val bytes = m.frames.tail
       pushBlock(bytesToObjects(bytes))
 
     case Closed ⇒ logInfo("received closed ")
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c0d729ff87..783b8dea31 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -48,7 +48,7 @@ import java.util.*;
 
 import akka.actor.Props;
 import akka.zeromq.Subscribe;
-
+import akka.util.ByteString;
 
 
 // The test suite itself is Serializable so that anonymous Function implementations can be
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 6d6ef149cc..d222f412a0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -209,7 +209,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
         else {
           // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter)
           val workerId = workerIdCounter.incrementAndGet().toString
-          val driverUrl = "akka://spark@%s:%s/user/%s".format(
+          val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
             System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
             StandaloneSchedulerBackend.ACTOR_NAME)
 
-- 
GitLab