diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 04c26b2e40faeaec2a30700b1e7f6284f8f2c768..a73438208aa29ef19aaa0ea10b0a34239c057b11 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -5,12 +5,12 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import akka.actor._ -import akka.dispatch._ +import scala.concurrent.Await import akka.pattern.ask import akka.remote._ -import akka.util.Duration +import scala.concurrent.duration.Duration import akka.util.Timeout -import akka.util.duration._ +import scala.concurrent.duration._ import spark.storage.BlockManager import spark.storage.StorageLevel diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 70eb9f702e689852d7f9bd21c55ac1750bd3e6cb..08d2956782b9c6d163870072e266a8f282d45850 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -8,12 +8,12 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import akka.actor._ -import akka.dispatch._ +import scala.concurrent.Await import akka.pattern.ask import akka.remote._ -import akka.util.Duration +import scala.concurrent.duration.Duration import akka.util.Timeout -import akka.util.duration._ +import scala.concurrent.duration._ import spark.scheduler.MapStatus import spark.storage.BlockManagerId diff --git a/core/src/main/scala/spark/api/java/function/Function.java b/core/src/main/scala/spark/api/java/function/Function.java index dae8295f213ab920d2bbd529b5be175a3ce07f56..9f6ab9a59283f33b347948983d9641bc9b55132d 100644 --- a/core/src/main/scala/spark/api/java/function/Function.java +++ b/core/src/main/scala/spark/api/java/function/Function.java @@ -1,7 +1,7 @@ package spark.api.java.function; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import scala.runtime.AbstractFunction1; import java.io.Serializable; @@ -15,8 +15,8 @@ import java.io.Serializable; public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable { public abstract R call(T t) throws Exception; - public ClassManifest<R> returnType() { - return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<R> returnType() { + return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/spark/api/java/function/Function2.java b/core/src/main/scala/spark/api/java/function/Function2.java index 69bf12c8c9dc9a72941fa3fcb07b880bebc6ecab..b32c178aa39f54f4118db63afc21ba1eeb263407 100644 --- a/core/src/main/scala/spark/api/java/function/Function2.java +++ b/core/src/main/scala/spark/api/java/function/Function2.java @@ -1,7 +1,7 @@ package spark.api.java.function; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import scala.runtime.AbstractFunction2; import java.io.Serializable; @@ -14,8 +14,9 @@ public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R> public abstract R call(T1 t1, T2 t2) throws Exception; - public ClassManifest<R> returnType() { - return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<R> returnType() { + return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class); } + } diff --git a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java index b3cc4df6aa47ae8d3a49c892e7c5727375f7ceca..296a3b5044a38a7a4c40301e3f499386b8653d5f 100644 --- a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java +++ b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java @@ -1,8 +1,8 @@ package spark.api.java.function; import scala.Tuple2; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import scala.runtime.AbstractFunction1; import java.io.Serializable; @@ -19,11 +19,11 @@ public abstract class PairFlatMapFunction<T, K, V> public abstract Iterable<Tuple2<K, V>> call(T t) throws Exception; - public ClassManifest<K> keyType() { - return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<K> keyType() { + return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class); } - public ClassManifest<V> valueType() { - return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<V> valueType() { + return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/spark/api/java/function/PairFunction.java b/core/src/main/scala/spark/api/java/function/PairFunction.java index 9fc6df4b8884474004cd9e7b8df3baf02e88019c..e3f94788e26f75db09c3c4a928e6579be43867dc 100644 --- a/core/src/main/scala/spark/api/java/function/PairFunction.java +++ b/core/src/main/scala/spark/api/java/function/PairFunction.java @@ -1,8 +1,8 @@ package spark.api.java.function; import scala.Tuple2; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import scala.runtime.AbstractFunction1; import java.io.Serializable; @@ -18,11 +18,11 @@ public abstract class PairFunction<T, K, V> public abstract Tuple2<K, V> call(T t) throws Exception; - public ClassManifest<K> keyType() { - return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<K> keyType() { + return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class); } - public ClassManifest<V> valueType() { - return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<V> valueType() { + return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala index 732fa080645b0e7752f507f528d539e67b8f8ddf..12069947fc5ad3c38ee73e170993894d247a096c 100644 --- a/core/src/main/scala/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala @@ -2,7 +2,7 @@ package spark.deploy import master.{JobInfo, WorkerInfo} import worker.ExecutorRunner -import cc.spray.json._ +import spray.json._ /** * spray-json helper class containing implicit conversion to json for marshalling responses diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index 90fe9508cdba9b5df0b438ee4e91447915c2c6fd..0aee7597962af39883ff16c3fbf3b6bd76e14555 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -3,7 +3,7 @@ package spark.deploy.client import spark.deploy._ import akka.actor._ import akka.pattern.ask -import akka.util.duration._ +import scala.concurrent.duration._ import akka.pattern.AskTimeoutException import spark.{SparkException, Logging} import akka.remote.RemoteClientLifeCycleEvent @@ -11,7 +11,7 @@ import akka.remote.RemoteClientShutdown import spark.deploy.RegisterJob import akka.remote.RemoteClientDisconnected import akka.actor.Terminated -import akka.dispatch.Await +import scala.concurrent.Await /** * The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description, diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 6ecebe626a7cef9d7be27fef71c84a6f3f76c8c7..e034312c12ba19ed752e47170da2f5a12bd377d5 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -49,7 +49,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } def startWebUi() { - val webUi = new MasterWebUI(context.system, self) + val webUi = new MasterWebUI(self) try { AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler) } catch { diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index 458ee2d66589c1f910f9678ffb2eb8d6a2b23a27..a4dadb6ef9484381121afb255da1d4f383c09945 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -1,39 +1,42 @@ package spark.deploy.master -import akka.actor.{ActorRef, ActorSystem} -import akka.dispatch.Await +import akka.actor.{ActorRef, ActorContext, ActorRefFactory} +import scala.concurrent.Await import akka.pattern.ask import akka.util.Timeout -import akka.util.duration._ -import cc.spray.Directives -import cc.spray.directives._ -import cc.spray.typeconversion.TwirlSupport._ -import cc.spray.http.MediaTypes -import cc.spray.typeconversion.SprayJsonSupport._ +import scala.concurrent.duration._ +import spray.routing.Directives +import spray.routing.directives._ +import spray.httpx.TwirlSupport._ +import spray.httpx.SprayJsonSupport._ +import spray.http.MediaTypes._ import spark.deploy._ import spark.deploy.JsonProtocol._ private[spark] -class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { +class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends Directives { + import context.dispatcher + + val actorSystem = context.system val RESOURCE_DIR = "spark/deploy/master/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" - + implicit val timeout = Timeout(1 seconds) - + val handler = { get { (path("") & parameters('format ?)) { case Some(js) if js.equalsIgnoreCase("json") => - val future = master ? RequestMasterState - respondWithMediaType(MediaTypes.`application/json`) { ctx => + val future = (master ? RequestMasterState).mapTo[MasterState] + respondWithMediaType(`application/json`) { ctx => ctx.complete(future.mapTo[MasterState]) } case _ => - completeWith { - val future = master ? RequestMasterState + complete { + val future = (master ? RequestMasterState).mapTo[MasterState] future.map { - masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState]) + masterState => spark.deploy.master.html.index.render(masterState) } } } ~ @@ -50,15 +53,13 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct } } } - respondWithMediaType(MediaTypes.`application/json`) { ctx => + respondWithMediaType(`application/json`) { ctx => ctx.complete(jobInfo.mapTo[JobInfo]) } case (jobId, _) => - completeWith { - val future = master ? RequestMasterState - future.map { state => - val masterState = state.asInstanceOf[MasterState] - + complete { + val future = (master ? RequestMasterState).mapTo[MasterState] + future.map { masterState => masterState.activeJobs.find(_.id == jobId) match { case Some(job) => spark.deploy.master.html.job_details.render(job) case _ => masterState.completedJobs.find(_.id == jobId) match { diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 7c9e588ea2d3277bb85a800c429fdc6179da1727..ec25a19e7b9cb5a86d6ace874a7410496820c655 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -100,7 +100,7 @@ private[spark] class Worker( } def startWebUi() { - val webUi = new WorkerWebUI(context.system, self) + val webUi = new WorkerWebUI(self) try { AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler) } catch { diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index f9489d99fc13fe2e70a9cda105fd22ce9fa6e3b4..7dd1781900ea873b45ce257b09fc793df04829ea 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -1,46 +1,49 @@ package spark.deploy.worker -import akka.actor.{ActorRef, ActorSystem} -import akka.dispatch.Await +import akka.actor.{ActorRef, ActorContext} +import scala.concurrent.Await import akka.pattern.ask import akka.util.Timeout -import akka.util.duration._ -import cc.spray.Directives -import cc.spray.typeconversion.TwirlSupport._ -import cc.spray.http.MediaTypes -import cc.spray.typeconversion.SprayJsonSupport._ +import scala.concurrent.duration._ +import spray.routing.Directives +import spray.httpx.TwirlSupport._ +import spray.httpx.SprayJsonSupport._ +import spray.http.MediaTypes._ import spark.deploy.{WorkerState, RequestWorkerState} import spark.deploy.JsonProtocol._ private[spark] -class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives { - val RESOURCE_DIR = "spark/deploy/worker/webui" +class WorkerWebUI(worker: ActorRef)(implicit val context: ActorContext) extends Directives { + import context.dispatcher + + val actorSystem = context.system + val RESOURCE_DIR = "spark/deploy/worker/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" - + implicit val timeout = Timeout(1 seconds) - + val handler = { get { - (path("") & parameters('format ?)) { + (path("") & parameters('format ?)) { case Some(js) if js.equalsIgnoreCase("json") => { - val future = worker ? RequestWorkerState - respondWithMediaType(MediaTypes.`application/json`) { ctx => - ctx.complete(future.mapTo[WorkerState]) + val future = (worker ? RequestWorkerState).mapTo[WorkerState] + respondWithMediaType(`application/json`) { ctx => + ctx.complete(future) } } case _ => - completeWith{ - val future = worker ? RequestWorkerState + complete { + val future = (worker ? RequestWorkerState).mapTo[WorkerState] future.map { workerState => - spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState]) + spark.deploy.worker.html.index(workerState) } } } ~ path("log") { parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) => - respondWithMediaType(cc.spray.http.MediaTypes.`text/plain`) { - getFromFileName("work/" + jobId + "/" + executorId + "/" + logType) + respondWithMediaType(`text/plain`) { + getFromFile("work/" + jobId + "/" + executorId + "/" + logType) } } } ~ diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 36c01ad629bbb938ca9b381748105b3db2f50c54..04b303afe07ceaafb41c77e5d0b95ff728381a8b 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -14,9 +14,9 @@ import scala.collection.mutable.SynchronizedQueue import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer -import akka.dispatch.{Await, Promise, ExecutionContext, Future} -import akka.util.Duration -import akka.util.duration._ +import scala.concurrent.{Await, Promise, ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.concurrent.duration._ private[spark] case class ConnectionManagerId(host: String, port: Int) { def toSocketAddress() = new InetSocketAddress(host, port) diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala index 533e4610f3a4c88c3b6f4b2dbb6503893a5cf25b..1b5b0935c2fc303ab9dd8964550061186c5734c3 100644 --- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala +++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala @@ -8,8 +8,8 @@ import scala.io.Source import java.nio.ByteBuffer import java.net.InetAddress -import akka.dispatch.Await -import akka.util.duration._ +import scala.concurrent.Await +import scala.concurrent.duration._ private[spark] object ConnectionManagerTest extends Logging{ def main(args: Array[String]) { diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index eeaae23dc86978abb285ecb776aafa0e049baf8e..03dc5f4b9b4abff52968c22fd74a52c014cd21c6 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -3,11 +3,11 @@ package spark.scheduler.cluster import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor._ -import akka.util.duration._ +import scala.concurrent.duration._ import akka.pattern.ask import spark.{SparkException, Logging, TaskState} -import akka.dispatch.Await +import scala.concurrent.Await import java.util.concurrent.atomic.AtomicInteger import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 7a8ac10cdd88e51ebd06588ca6b7801cc9a693d1..7a1344668fbcad484458ed9e82b9fa12ef8a4723 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -8,9 +8,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConversions._ import akka.actor.{ActorSystem, Cancellable, Props} -import akka.dispatch.{Await, Future} -import akka.util.Duration -import akka.util.duration._ +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index a3d8671834dbc676c83e4a739c40462127a93e93..46e1860d09fff10868c8a4e8c7093f1b2cd12c91 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -4,10 +4,9 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import akka.dispatch.Await +import scala.concurrent.Await import akka.pattern.ask -import akka.util.{Duration, Timeout} -import akka.util.duration._ +import scala.concurrent.duration._ import spark.{Logging, SparkException, Utils} diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index f4d026da3329c801775275899a7f2dd94136f6dc..9bf9161c312552c931326322fe4ca9cdf9a258b9 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -7,8 +7,7 @@ import scala.collection.JavaConversions._ import scala.util.Random import akka.actor.{Actor, ActorRef, Cancellable} -import akka.util.{Duration, Timeout} -import akka.util.duration._ +import scala.concurrent.duration._ import spark.{Logging, Utils} @@ -42,6 +41,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { override def preStart() { if (!BlockManager.getDisableHeartBeatsForTesting) { + import context.dispatcher timeoutCheckingTask = context.system.scheduler.schedule( 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) } diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index e67cb0336d06639982d87769abe89447f8918258..6fd9aa70fcff8e6012bc4ba960e2200bdb794b90 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -1,16 +1,16 @@ package spark.util -import akka.actor.{Props, ActorSystemImpl, ActorSystem} +import akka.actor.{Props, ActorSystem, ExtendedActorSystem} import com.typesafe.config.ConfigFactory -import akka.util.duration._ +import scala.concurrent.duration._ import akka.pattern.ask import akka.remote.RemoteActorRefProvider -import cc.spray.Route -import cc.spray.io.IoWorker -import cc.spray.{SprayCanRootService, HttpService} -import cc.spray.can.server.HttpServer -import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler -import akka.dispatch.Await +import spray.routing.Route +import spray.io.IOExtension +import spray.routing.HttpServiceActor +import spray.can.server.{HttpServer, ServerSettings} +import spray.io.SingletonHandler +import scala.concurrent.Await import spark.SparkException import java.util.concurrent.TimeoutException @@ -23,11 +23,11 @@ private[spark] object AkkaUtils { * ActorSystem itself and its port (which is hard to get from Akka). */ def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { - val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt + val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt - val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt + val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt - val akkaConf = ConfigFactory.parseString(""" + val akkaConf = ConfigFactory.parseString(""" akka.daemonic = on akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] akka.actor.provider = "akka.remote.RemoteActorRefProvider" @@ -44,7 +44,7 @@ private[spark] object AkkaUtils { // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a // hack because Akka doesn't let you figure out the port through the public API yet. - val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider + val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get return (actorSystem, boundPort) } @@ -54,14 +54,13 @@ private[spark] object AkkaUtils { * handle requests. Throws a SparkException if this fails. */ def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) { - val ioWorker = new IoWorker(actorSystem).start() - val httpService = actorSystem.actorOf(Props(new HttpService(route))) - val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService))) - val server = actorSystem.actorOf( - Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer") - actorSystem.registerOnTermination { ioWorker.stop() } + val ioWorker = IOExtension(actorSystem).ioBridge() + val httpService = actorSystem.actorOf(Props(HttpServiceActor(route))) + val server = actorSystem.actorOf( + Props(new HttpServer(ioWorker, SingletonHandler(httpService), ServerSettings())), name = "HttpServer") + actorSystem.registerOnTermination { actorSystem.stop(ioWorker) } val timeout = 3.seconds - val future = server.ask(HttpServer.Bind(ip, port))(timeout) + val future = server.ask(HttpServer.Bind(ip, port))(timeout) try { Await.result(future, timeout) match { case bound: HttpServer.Bound => diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala index 467605981b3d99b886adc61b981340710e21b760..d0c2bd47fc02b0fad16722db332fc3c80e69de1a 100644 --- a/core/src/test/scala/spark/CacheTrackerSuite.scala +++ b/core/src/test/scala/spark/CacheTrackerSuite.scala @@ -5,20 +5,19 @@ import org.scalatest.FunSuite import scala.collection.mutable.HashMap import akka.actor._ -import akka.dispatch._ -import akka.pattern.ask +import scala.concurrent.{Await, Future} import akka.remote._ -import akka.util.Duration +import scala.concurrent.duration.Duration import akka.util.Timeout -import akka.util.duration._ +import scala.concurrent.duration._ class CacheTrackerSuite extends FunSuite { // Send a message to an actor and wait for a reply, in a blocking manner private def ask(actor: ActorRef, message: Any): Any = { try { val timeout = 10.seconds - val future = actor.ask(message)(timeout) - return Await.result(future, timeout) + val future: Future[Any] = akka.pattern.ask(actor, message)(timeout) + Await.result(future, timeout) } catch { case e: Exception => throw new SparkException("Error communicating with actor", e) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 219674028eb7d2884986a706c121217f924a3869..d0b3c350f12bd871c0b6d26406d486fa6c8b4ccd 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -17,11 +17,11 @@ object SparkBuild extends Build { //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1" //val HADOOP_MAJOR_VERSION = "2" - lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel) + lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, /*repl,*/ examples, bagel) lazy val core = Project("core", file("core"), settings = coreSettings) - lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) +// lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) @@ -32,10 +32,10 @@ object SparkBuild extends Build { lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") def sharedSettings = Defaults.defaultSettings ++ Seq( - organization := "org.spark-project", - version := "0.7.0-SNAPSHOT", - scalaVersion := "2.9.2", - scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue + organization := "org.spark-project", + version := "0.7.0-SNAPSHOT", + scalaVersion := "2.10.0", + scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", @@ -87,11 +87,11 @@ object SparkBuild extends Build { */ libraryDependencies ++= Seq( - "org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011", - "org.scalatest" %% "scalatest" % "1.8" % "test", - "org.scalacheck" %% "scalacheck" % "1.9" % "test", - "com.novocode" % "junit-interface" % "0.8" % "test" - ), + "org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011", + "org.scalatest" %% "scalatest" % "1.9.1" % "test", + "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", + "com.novocode" % "junit-interface" % "0.8" % "test" + ), parallelExecution := false, /* Workaround for issue #206 (fixed after SBT 0.11.0) */ watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task, @@ -112,31 +112,33 @@ object SparkBuild extends Build { name := "spark-core", resolvers ++= Seq( "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", - "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", - "Spray Repository" at "http://repo.spray.cc/", + "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", + "Spray Repository" at "http://repo.spray.cc/", "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/" ), libraryDependencies ++= Seq( - "com.google.guava" % "guava" % "11.0.1", - "log4j" % "log4j" % "1.2.16", - "org.slf4j" % "slf4j-api" % slf4jVersion, - "org.slf4j" % "slf4j-log4j12" % slf4jVersion, - "com.ning" % "compress-lzf" % "0.8.4", - "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, - "asm" % "asm-all" % "3.3.1", - "com.google.protobuf" % "protobuf-java" % "2.4.1", - "de.javakaffee" % "kryo-serializers" % "0.20", - "com.typesafe.akka" % "akka-actor" % "2.0.3", - "com.typesafe.akka" % "akka-remote" % "2.0.3", - "com.typesafe.akka" % "akka-slf4j" % "2.0.3", - "it.unimi.dsi" % "fastutil" % "6.4.4", - "colt" % "colt" % "1.2.0", - "cc.spray" % "spray-can" % "1.0-M2.1", - "cc.spray" % "spray-server" % "1.0-M2.1", - "cc.spray" %% "spray-json" % "1.1.1", - "org.apache.mesos" % "mesos" % "0.9.0-incubating" - ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq, + "com.google.guava" % "guava" % "11.0.1", + "log4j" % "log4j" % "1.2.16", + "org.slf4j" % "slf4j-api" % slf4jVersion, + "org.slf4j" % "slf4j-log4j12" % slf4jVersion, + "com.ning" % "compress-lzf" % "0.8.4", + "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, + "asm" % "asm-all" % "3.3.1", + "com.google.protobuf" % "protobuf-java" % "2.4.1", + "de.javakaffee" % "kryo-serializers" % "0.20", + "com.typesafe.akka" %% "akka-remote" % "2.1.0", + "com.typesafe.akka" %% "akka-slf4j" % "2.1.0", + "it.unimi.dsi" % "fastutil" % "6.4.4", + "io.spray" % "spray-can" % "1.1-M7", + "io.spray" % "spray-io" % "1.1-M7", + "io.spray" % "spray-routing" % "1.1-M7", + "io.spray" %% "spray-json" % "1.2.3", + "colt" % "colt" % "1.2.0", + "org.apache.mesos" % "mesos" % "0.9.0-incubating", + "org.scala-lang" % "scala-actors" % "2.10.0" + ) ++ (if (HADOOP_MAJOR_VERSION == "2") + Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq, unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") } ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings @@ -144,10 +146,10 @@ object SparkBuild extends Build { publish := {} ) - def replSettings = sharedSettings ++ Seq( +/* def replSettings = sharedSettings ++ Seq( name := "spark-repl", libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _) - ) + )*/ def examplesSettings = sharedSettings ++ Seq( name := "spark-examples" diff --git a/project/build.properties b/project/build.properties index d4287112c6afb76c00419432dbc7aa79945f09ee..4474a03e1aa9671540ef1677b5579ef91481f503 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.11.3 +sbt.version=0.12.1 diff --git a/project/plugins.sbt b/project/plugins.sbt index 4d0e696a113581cad40119265e1e86f67ec0ab9a..0e4eb7085c01fa6757b185313472d6cb14b268b8 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,11 +6,11 @@ resolvers += "Spray Repository" at "http://repo.spray.cc/" addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.3") -addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1") +addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1") -addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0") +addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0") -addSbtPlugin("cc.spray" %% "sbt-twirl" % "0.5.2") +addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1") // For Sonatype publishing //resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)