diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index e070a15a54f06abcb892d3a3844864a1061245b3..d1428bcfc67760a4f4e883f3eb18eae74b055e33 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -3,7 +3,7 @@ package spark.deploy.master import akka.actor._ import akka.actor.Terminated import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} -import akka.util.duration._ +import scala.concurrent.duration._ import java.text.SimpleDateFormat import java.util.Date @@ -50,6 +50,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) startWebUi() + import context.dispatcher context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) } diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index 59d59dde78032f7bc71ee8debc75cdf01d5097c9..fe859d48c32e7e44b6947e4f93c50563980f5cb9 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -27,6 +27,7 @@ class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends implicit val timeout = Timeout(10 seconds) + val handler = { get { (path("") & parameters('format ?)) { @@ -52,27 +53,11 @@ class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends masterState.completedApps.find(_.id == appId).getOrElse(null) }) } -<<<<<<< HEAD respondWithMediaType(`application/json`) { ctx => - ctx.complete(jobInfo.mapTo[JobInfo]) - } - case (jobId, _) => - complete { - val future = (master ? RequestMasterState).mapTo[MasterState] - future.map { masterState => - masterState.activeJobs.find(_.id == jobId) match { - case Some(job) => spark.deploy.master.html.job_details.render(job) - case _ => masterState.completedJobs.find(_.id == jobId) match { - case Some(job) => spark.deploy.master.html.job_details.render(job) - case _ => null - } - } -======= - respondWithMediaType(MediaTypes.`application/json`) { ctx => ctx.complete(appInfo.mapTo[ApplicationInfo]) } case (appId, _) => - completeWith { + complete { val future = master ? RequestMasterState future.map { state => val masterState = state.asInstanceOf[MasterState] @@ -80,7 +65,6 @@ class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends masterState.completedApps.find(_.id == appId).getOrElse(null) }) spark.deploy.master.html.app_details.render(app) ->>>>>>> 17e076de800ea0d4c55f2bd657348641f6f9c55b } } } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 700d87b1c1da2e1a8541ad5c7d729f0392e138fb..5bcf00443cac8b98a7a5f00798924417b9de3feb 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -2,7 +2,7 @@ package spark.deploy.worker import scala.collection.mutable.{ArrayBuffer, HashMap} import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} -import akka.util.duration._ +import scala.concurrent.duration._ import spark.{Logging, Utils} import spark.util.AkkaUtils import spark.deploy._ @@ -15,6 +15,7 @@ import spark.deploy.RegisterWorkerFailed import spark.deploy.master.Master import java.io.File + private[spark] class Worker( ip: String, port: Int, @@ -81,7 +82,7 @@ private[spark] class Worker( } def startWebUi() { - val webUi = new WorkerWebUI(context.system, self, workDir) + val webUi = new WorkerWebUI(self, workDir) try { AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler) } catch { @@ -95,6 +96,7 @@ private[spark] class Worker( case RegisteredWorker(url) => masterWebUiUrl = url logInfo("Successfully registered with master") + import context.dispatcher context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { master ! Heartbeat(workerId) } diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index 99c3b506fa4634c74a349f73a0e37c615a545fe0..33a2a9516eeaf4625c0116cfbf2ca936023c6afd 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -18,8 +18,11 @@ import java.io.File * Web UI server for the standalone worker. */ private[spark] -class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) extends Directives { - val RESOURCE_DIR = "spark/deploy/worker/webui" +class WorkerWebUI(worker: ActorRef, workDir: File)(implicit val context: ActorContext) extends Directives { + import context.dispatcher + + val actorSystem = context.system + val RESOURCE_DIR = "spark/deploy/worker/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" implicit val timeout = Timeout(10 seconds) @@ -42,9 +45,9 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) } } ~ path("log") { - parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) => + parameters("appId", "executorId", "logType") { (appId, executorId, logType) => respondWithMediaType(`text/plain`) { - getFromFileName(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType) + getFromFile(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType) } } } ~ diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 8f737c5c6a8030fa31b995a852af82bac057cc9d..d3f6cd78dc3a2f0ac5d134b2a9d7aa1e09091959 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -873,7 +873,7 @@ object BlockManager extends Logging { } def getHeartBeatFrequencyFromSystemProperties: Long = - System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong + System.getProperty("spark.storage.blockManagerHeartBeatMs", "10000").toLong def getDisableHeartBeatsForTesting: Boolean = System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 9e6721ec17169a8ca33753393329049467b105dd..a3397a0fb452010739d92b60692710eb554571db 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -2,9 +2,9 @@ package spark.storage import akka.actor.{ActorRef, ActorSystem} import akka.util.Timeout -import akka.util.duration._ -import cc.spray.typeconversion.TwirlSupport._ -import cc.spray.Directives +import scala.concurrent.duration._ +import spray.httpx.TwirlSupport._ +import spray.routing.Directives import spark.{Logging, SparkContext} import spark.util.AkkaUtils import spark.Utils @@ -17,7 +17,8 @@ private[spark] class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext) extends Directives with Logging { - val STATIC_RESOURCE_DIR = "spark/deploy/static" + implicit val implicitActorSystem = actorSystem + val STATIC_RESOURCE_DIR = "spark/deploy/static" implicit val timeout = Timeout(10 seconds) @@ -31,7 +32,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, // random port it bound to, so we have to try to find a local one by creating a socket. Utils.findFreePort() } - AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer") + AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler) logInfo("Started BlockManager web UI at http://%s:%d".format(Utils.localHostName(), port)) } catch { case e: Exception => @@ -43,7 +44,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, val handler = { get { path("") { - completeWith { + complete { // Request the current storage status from the Master val storageStatusList = sc.getExecutorStorageStatus // Calculate macro-level statistics @@ -58,7 +59,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, } ~ path("rdd") { parameter("id") { id => - completeWith { + complete { val prefix = "rdd_" + id.toString val storageStatusList = sc.getExecutorStorageStatus val filteredStorageStatusList = StorageUtils. diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index 6f551b2b9c67572f118e5fe8692b5d7ad8c45814..70338ec4dc5f4a2fabf5e19af8364fd7f0ffe56a 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -32,7 +32,6 @@ private[spark] object AkkaUtils { val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt val lifecycleEvents = System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean - val lifecycleEvents = System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean val akkaConf = ConfigFactory.parseString(""" akka.daemonic = on akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala index f59334a033d7a00f3b00c4ee9b20713839ee0f39..fb54ccb51e89bb27aaf57558a61eafb767fbb921 100644 --- a/core/src/test/scala/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/spark/AccumulatorSuite.scala @@ -8,7 +8,7 @@ import scala.math.exp import scala.math.signum import spark.SparkContext._ -class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext { +class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext { implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] { diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 4104b33c8b6815ddebbe50ea595e633fb0cba46e..46b74fe5eeea07422fd925f1b3871dab1ee8adf3 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -257,7 +257,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter object DistributedSuite { // Indicates whether this JVM is marked for failure. var mark = false - + // Set by test to remember if we are in the driver program so we can assert // that we are not. var amMaster = false @@ -274,9 +274,9 @@ object DistributedSuite { // Act like an identity function, but if mark was set to true previously, fail, // crashing the entire JVM. def failOnMarkedIdentity(item: Boolean): Boolean = { - if (mark) { + if (mark) { System.exit(42) - } + } item - } + } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 2571e54b048181357add0f669d94ad53c756e0d4..7b61e2ba3eaa3430658b2a7faa50d077a208786e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -178,10 +178,10 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", - "com.github.sgroschupf" % "zkclient" % "0.1", - "org.twitter4j" % "twitter4j-stream" % "3.0.3", - "com.typesafe.akka" % "akka-zeromq" % "2.0.3" + "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", + "com.github.sgroschupf" % "zkclient" % "0.1", + "org.twitter4j" % "twitter4j-stream" % "3.0.3", + "com.typesafe.akka" % "akka-zeromq" % "2.1-M1" excludeAll(ExclusionRule(name = "akka-actor"), ExclusionRule(organization = "org.scala-lang")) ) ) ++ assemblySettings ++ extraAssemblySettings diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index e83e40376021102edf327ae634c8ce7d99c27dfb..2b6e7b68bf30f4dd76fbff5bb8fc4ba6a9498385 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -222,21 +222,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, } } - /** Print a welcome message */ - def printWelcome() { - echo("""Welcome to - ____ __ - / __/__ ___ _____/ /__ - _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 - /_/ -""") - import Properties._ - val welcomeMsg = "Using Scala %s (%s, Java %s)".format( - versionString, javaVmName, javaVersion) - echo(welcomeMsg) - } - /** Show the history */ lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") { override def usage = "[num]" diff --git a/repl/src/main/scala/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/spark/repl/SparkILoopInit.scala index 6ae535c4e6f2486d461a0fc9e1e8603838ea2756..8b7da3d3c697720ee59d0e334f7988f1f8513de7 100644 --- a/repl/src/main/scala/spark/repl/SparkILoopInit.scala +++ b/repl/src/main/scala/spark/repl/SparkILoopInit.scala @@ -24,7 +24,7 @@ trait SparkILoopInit { ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 0.7.1-SNAPSHOT + /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 /_/ """) import Properties._ diff --git a/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala b/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala index fd1a1b1e7cd50def36de4a981338231d48281cdc..8f439f0681a05a70ad65387be9350518c6733064 100644 --- a/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala +++ b/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala @@ -17,6 +17,7 @@ trait ReplSuiteMixin { val localIp = "127.0.1.2" val port = "7089" val sparkUrl = s"spark://$localIp:$port" + def setupStandaloneCluster() { future { Master.main(Array("-i", localIp, "-p", port, "--webui-port", "0")) } Thread.sleep(2000) diff --git a/run b/run index 4755d562a7f5dd39c0e53626f1293bdb27629a0a..96c7f8a0955fd632172c52546337245300d5457a 100755 --- a/run +++ b/run @@ -164,4 +164,4 @@ else EXTRA_ARGS="$JAVA_OPTS" fi -exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" +exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index b159d26c02b2d4383253110d1d589fc2a6b4261b..e5bb654578251e4b7651b700c6bc50c809b677f0 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -11,8 +11,8 @@ import scala.collection.mutable.Queue import akka.actor._ import akka.pattern.ask -import akka.util.duration._ -import akka.dispatch._ +import scala.concurrent.duration._ +// import akka.dispatch._ private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 7385474963ebb70bfd7f8f86a16282e341824c80..5347374730da79ffe3b194aa0adf1d52fd7e9290 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -7,13 +7,15 @@ import spark.rdd.BlockRDD import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ import java.nio.ByteBuffer import akka.actor.{Props, Actor} import akka.pattern.ask -import akka.dispatch.Await -import akka.util.duration._ +import scala.concurrent.Await +import akka.util.Timeout + import spark.streaming.util.{RecurringTimer, SystemClock} import java.util.concurrent.ArrayBlockingQueue diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala index b3201d0b28d797c3a8d7e200e19b25fff878730b..6c9e373de3acd6d49ec2ab34d0ea20faab4dbdbb 100644 --- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala @@ -3,6 +3,8 @@ package spark.streaming.receivers import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } import akka.actor.{ actorRef2Scala, ActorRef } import akka.actor.{ PossiblyHarmful, OneForOneStrategy } +import akka.actor.SupervisorStrategy._ +import scala.concurrent.duration._ import spark.storage.StorageLevel import spark.streaming.dstream.NetworkReceiver @@ -12,9 +14,6 @@ import java.util.concurrent.atomic.AtomicInteger /** A helper with set of defaults for supervisor strategy **/ object ReceiverSupervisorStrategy { - import akka.util.duration._ - import akka.actor.SupervisorStrategy._ - val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { case _: RuntimeException ⇒ Restart @@ -27,10 +26,10 @@ object ReceiverSupervisorStrategy { * pushBlock API. * * @example {{{ - * class MyActor extends Actor with Receiver{ - * def receive { - * case anything :String ⇒ pushBlock(anything) - * } + * class MyActor extends Actor with Receiver{ + * def receive { + * case anything :String ⇒ pushBlock(anything) + * } * } * //Can be plugged in actorStream as follows * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") @@ -74,12 +73,12 @@ private[streaming] case class Data[T: ClassManifest](data: T) * his own Actor to run as receiver for Spark Streaming input source. * * This starts a supervisor actor which starts workers and also provides - * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. - * + * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. + * * Here's a way to start more supervisor/workers as its children. * * @example {{{ - * context.parent ! Props(new Supervisor) + * context.parent ! Props(new Supervisor) * }}} OR {{{ * context.parent ! Props(new Worker,"Worker") * }}} diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala index 5533c3cf1ef8b5a316aea8b8fa2b9c88f60872fe..e7608f08ae0077ab5701dd603114bbc4d1d93250 100644 --- a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala +++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala @@ -13,7 +13,7 @@ private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T]) extends Actor with Receiver with Logging { - override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self), + override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe) def receive: Receive = {