From ad88f083a627ba38e99b1b135a82a1fcfd107444 Mon Sep 17 00:00:00 2001 From: Prashant Sharma <prashant.s@imaginea.com> Date: Wed, 24 Apr 2013 18:08:26 +0530 Subject: [PATCH] scala 2.10 and master merge --- .../scala/spark/deploy/master/Master.scala | 3 ++- .../spark/deploy/master/MasterWebUI.scala | 20 ++----------------- .../scala/spark/deploy/worker/Worker.scala | 6 ++++-- .../spark/deploy/worker/WorkerWebUI.scala | 11 ++++++---- .../scala/spark/storage/BlockManager.scala | 2 +- .../scala/spark/storage/BlockManagerUI.scala | 15 +++++++------- .../src/main/scala/spark/util/AkkaUtils.scala | 1 - .../test/scala/spark/AccumulatorSuite.scala | 2 +- .../test/scala/spark/DistributedSuite.scala | 8 ++++---- project/SparkBuild.scala | 8 ++++---- .../main/scala/spark/repl/SparkILoop.scala | 15 -------------- .../scala/spark/repl/SparkILoopInit.scala | 2 +- .../scala/spark/repl/ReplSuiteMixin.scala | 1 + run | 2 +- .../spark/streaming/NetworkInputTracker.scala | 4 ++-- .../dstream/NetworkInputDStream.scala | 6 ++++-- .../streaming/receivers/ActorReceiver.scala | 19 +++++++++--------- .../streaming/receivers/ZeroMQReceiver.scala | 2 +- 18 files changed, 52 insertions(+), 75 deletions(-) diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index e070a15a54..d1428bcfc6 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -3,7 +3,7 @@ package spark.deploy.master import akka.actor._ import akka.actor.Terminated import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} -import akka.util.duration._ +import scala.concurrent.duration._ import java.text.SimpleDateFormat import java.util.Date @@ -50,6 +50,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) startWebUi() + import context.dispatcher context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) } diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index 59d59dde78..fe859d48c3 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -27,6 +27,7 @@ class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends implicit val timeout = Timeout(10 seconds) + val handler = { get { (path("") & parameters('format ?)) { @@ -52,27 +53,11 @@ class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends masterState.completedApps.find(_.id == appId).getOrElse(null) }) } -<<<<<<< HEAD respondWithMediaType(`application/json`) { ctx => - ctx.complete(jobInfo.mapTo[JobInfo]) - } - case (jobId, _) => - complete { - val future = (master ? RequestMasterState).mapTo[MasterState] - future.map { masterState => - masterState.activeJobs.find(_.id == jobId) match { - case Some(job) => spark.deploy.master.html.job_details.render(job) - case _ => masterState.completedJobs.find(_.id == jobId) match { - case Some(job) => spark.deploy.master.html.job_details.render(job) - case _ => null - } - } -======= - respondWithMediaType(MediaTypes.`application/json`) { ctx => ctx.complete(appInfo.mapTo[ApplicationInfo]) } case (appId, _) => - completeWith { + complete { val future = master ? RequestMasterState future.map { state => val masterState = state.asInstanceOf[MasterState] @@ -80,7 +65,6 @@ class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends masterState.completedApps.find(_.id == appId).getOrElse(null) }) spark.deploy.master.html.app_details.render(app) ->>>>>>> 17e076de800ea0d4c55f2bd657348641f6f9c55b } } } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 700d87b1c1..5bcf00443c 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -2,7 +2,7 @@ package spark.deploy.worker import scala.collection.mutable.{ArrayBuffer, HashMap} import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} -import akka.util.duration._ +import scala.concurrent.duration._ import spark.{Logging, Utils} import spark.util.AkkaUtils import spark.deploy._ @@ -15,6 +15,7 @@ import spark.deploy.RegisterWorkerFailed import spark.deploy.master.Master import java.io.File + private[spark] class Worker( ip: String, port: Int, @@ -81,7 +82,7 @@ private[spark] class Worker( } def startWebUi() { - val webUi = new WorkerWebUI(context.system, self, workDir) + val webUi = new WorkerWebUI(self, workDir) try { AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler) } catch { @@ -95,6 +96,7 @@ private[spark] class Worker( case RegisteredWorker(url) => masterWebUiUrl = url logInfo("Successfully registered with master") + import context.dispatcher context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { master ! Heartbeat(workerId) } diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index 99c3b506fa..33a2a9516e 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -18,8 +18,11 @@ import java.io.File * Web UI server for the standalone worker. */ private[spark] -class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) extends Directives { - val RESOURCE_DIR = "spark/deploy/worker/webui" +class WorkerWebUI(worker: ActorRef, workDir: File)(implicit val context: ActorContext) extends Directives { + import context.dispatcher + + val actorSystem = context.system + val RESOURCE_DIR = "spark/deploy/worker/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" implicit val timeout = Timeout(10 seconds) @@ -42,9 +45,9 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) } } ~ path("log") { - parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) => + parameters("appId", "executorId", "logType") { (appId, executorId, logType) => respondWithMediaType(`text/plain`) { - getFromFileName(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType) + getFromFile(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType) } } } ~ diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 8f737c5c6a..d3f6cd78dc 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -873,7 +873,7 @@ object BlockManager extends Logging { } def getHeartBeatFrequencyFromSystemProperties: Long = - System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong + System.getProperty("spark.storage.blockManagerHeartBeatMs", "10000").toLong def getDisableHeartBeatsForTesting: Boolean = System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 9e6721ec17..a3397a0fb4 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -2,9 +2,9 @@ package spark.storage import akka.actor.{ActorRef, ActorSystem} import akka.util.Timeout -import akka.util.duration._ -import cc.spray.typeconversion.TwirlSupport._ -import cc.spray.Directives +import scala.concurrent.duration._ +import spray.httpx.TwirlSupport._ +import spray.routing.Directives import spark.{Logging, SparkContext} import spark.util.AkkaUtils import spark.Utils @@ -17,7 +17,8 @@ private[spark] class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext) extends Directives with Logging { - val STATIC_RESOURCE_DIR = "spark/deploy/static" + implicit val implicitActorSystem = actorSystem + val STATIC_RESOURCE_DIR = "spark/deploy/static" implicit val timeout = Timeout(10 seconds) @@ -31,7 +32,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, // random port it bound to, so we have to try to find a local one by creating a socket. Utils.findFreePort() } - AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer") + AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler) logInfo("Started BlockManager web UI at http://%s:%d".format(Utils.localHostName(), port)) } catch { case e: Exception => @@ -43,7 +44,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, val handler = { get { path("") { - completeWith { + complete { // Request the current storage status from the Master val storageStatusList = sc.getExecutorStorageStatus // Calculate macro-level statistics @@ -58,7 +59,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, } ~ path("rdd") { parameter("id") { id => - completeWith { + complete { val prefix = "rdd_" + id.toString val storageStatusList = sc.getExecutorStorageStatus val filteredStorageStatusList = StorageUtils. diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index 6f551b2b9c..70338ec4dc 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -32,7 +32,6 @@ private[spark] object AkkaUtils { val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt val lifecycleEvents = System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean - val lifecycleEvents = System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean val akkaConf = ConfigFactory.parseString(""" akka.daemonic = on akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index f59334a033..fb54ccb51e 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -8,7 +8,7 @@ import scala.math.exp import scala.math.signum import spark.SparkContext._ -class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext { +class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext { implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] { diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 4104b33c8b..46b74fe5ee 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -257,7 +257,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter object DistributedSuite { // Indicates whether this JVM is marked for failure. var mark = false - + // Set by test to remember if we are in the driver program so we can assert // that we are not. var amMaster = false @@ -274,9 +274,9 @@ object DistributedSuite { // Act like an identity function, but if mark was set to true previously, fail, // crashing the entire JVM. def failOnMarkedIdentity(item: Boolean): Boolean = { - if (mark) { + if (mark) { System.exit(42) - } + } item - } + } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 2571e54b04..7b61e2ba3e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -178,10 +178,10 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", - "com.github.sgroschupf" % "zkclient" % "0.1", - "org.twitter4j" % "twitter4j-stream" % "3.0.3", - "com.typesafe.akka" % "akka-zeromq" % "2.0.3" + "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", + "com.github.sgroschupf" % "zkclient" % "0.1", + "org.twitter4j" % "twitter4j-stream" % "3.0.3", + "com.typesafe.akka" % "akka-zeromq" % "2.1-M1" excludeAll(ExclusionRule(name = "akka-actor"), ExclusionRule(organization = "org.scala-lang")) ) ) ++ assemblySettings ++ extraAssemblySettings diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index e83e403760..2b6e7b68bf 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -222,21 +222,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, } } - /** Print a welcome message */ - def printWelcome() { - echo("""Welcome to - ____ __ - / __/__ ___ _____/ /__ - _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 - /_/ -""") - import Properties._ - val welcomeMsg = "Using Scala %s (%s, Java %s)".format( - versionString, javaVmName, javaVersion) - echo(welcomeMsg) - } - /** Show the history */ lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") { override def usage = "[num]" diff --git a/repl/src/main/scala/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/spark/repl/SparkILoopInit.scala index 6ae535c4e6..8b7da3d3c6 100644 --- a/repl/src/main/scala/spark/repl/SparkILoopInit.scala +++ b/repl/src/main/scala/spark/repl/SparkILoopInit.scala @@ -24,7 +24,7 @@ trait SparkILoopInit { ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 0.7.1-SNAPSHOT + /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 /_/ """) import Properties._ diff --git a/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala b/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala index fd1a1b1e7c..8f439f0681 100644 --- a/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala +++ b/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala @@ -17,6 +17,7 @@ trait ReplSuiteMixin { val localIp = "127.0.1.2" val port = "7089" val sparkUrl = s"spark://$localIp:$port" + def setupStandaloneCluster() { future { Master.main(Array("-i", localIp, "-p", port, "--webui-port", "0")) } Thread.sleep(2000) diff --git a/run b/run index 4755d562a7..96c7f8a095 100755 --- a/run +++ b/run @@ -164,4 +164,4 @@ else EXTRA_ARGS="$JAVA_OPTS" fi -exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" +exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index b159d26c02..e5bb654578 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -11,8 +11,8 @@ import scala.collection.mutable.Queue import akka.actor._ import akka.pattern.ask -import akka.util.duration._ -import akka.dispatch._ +import scala.concurrent.duration._ +// import akka.dispatch._ private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 7385474963..5347374730 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -7,13 +7,15 @@ import spark.rdd.BlockRDD import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ import java.nio.ByteBuffer import akka.actor.{Props, Actor} import akka.pattern.ask -import akka.dispatch.Await -import akka.util.duration._ +import scala.concurrent.Await +import akka.util.Timeout + import spark.streaming.util.{RecurringTimer, SystemClock} import java.util.concurrent.ArrayBlockingQueue diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala index b3201d0b28..6c9e373de3 100644 --- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala @@ -3,6 +3,8 @@ package spark.streaming.receivers import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } import akka.actor.{ actorRef2Scala, ActorRef } import akka.actor.{ PossiblyHarmful, OneForOneStrategy } +import akka.actor.SupervisorStrategy._ +import scala.concurrent.duration._ import spark.storage.StorageLevel import spark.streaming.dstream.NetworkReceiver @@ -12,9 +14,6 @@ import java.util.concurrent.atomic.AtomicInteger /** A helper with set of defaults for supervisor strategy **/ object ReceiverSupervisorStrategy { - import akka.util.duration._ - import akka.actor.SupervisorStrategy._ - val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { case _: RuntimeException ⇒ Restart @@ -27,10 +26,10 @@ object ReceiverSupervisorStrategy { * pushBlock API. * * @example {{{ - * class MyActor extends Actor with Receiver{ - * def receive { - * case anything :String ⇒ pushBlock(anything) - * } + * class MyActor extends Actor with Receiver{ + * def receive { + * case anything :String ⇒ pushBlock(anything) + * } * } * //Can be plugged in actorStream as follows * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") @@ -74,12 +73,12 @@ private[streaming] case class Data[T: ClassManifest](data: T) * his own Actor to run as receiver for Spark Streaming input source. * * This starts a supervisor actor which starts workers and also provides - * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. - * + * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. + * * Here's a way to start more supervisor/workers as its children. * * @example {{{ - * context.parent ! Props(new Supervisor) + * context.parent ! Props(new Supervisor) * }}} OR {{{ * context.parent ! Props(new Worker,"Worker") * }}} diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala index 5533c3cf1e..e7608f08ae 100644 --- a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala +++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala @@ -13,7 +13,7 @@ private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T]) extends Actor with Receiver with Logging { - override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self), + override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe) def receive: Receive = { -- GitLab