diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 2e8cb609b262967fc371997cf14bd0729226cf75..8a06642426ba2d4d80e4d3d5e8f942207f8c86eb 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -42,7 +42,7 @@ import spark.scheduler.DAGScheduler import spark.scheduler.TaskScheduler import spark.scheduler.local.LocalScheduler import spark.scheduler.cluster.ClusterScheduler -import spark.scheduler.mesos.MesosScheduler +import spark.scheduler.mesos.MesosSchedulerBackend import spark.storage.BlockManagerMaster class SparkContext( @@ -90,14 +90,14 @@ class SparkContext( case _ => MesosNativeLibrary.load() val sched = new ClusterScheduler(this) - val schedContext = new MesosScheduler(sched, this, master, frameworkName) + val schedContext = new MesosSchedulerBackend(sched, this, master, frameworkName) sched.initialize(schedContext) sched /* if (System.getProperty("spark.mesos.coarse", "false") == "true") { new CoarseMesosScheduler(this, master, frameworkName) } else { - new MesosScheduler(this, master, frameworkName) + new MesosSchedulerBackend(this, master, frameworkName) } */ } diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index ac30ae9aeca10a80055d6847beaf037667e306c7..e3958cec519521f43ce811bfa68f58f114c0c093 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -47,11 +47,11 @@ class Executor extends Logging { 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) } - def launchTask(context: ExecutorContext, taskId: Long, serializedTask: ByteBuffer) { + def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { threadPool.execute(new TaskRunner(context, taskId, serializedTask)) } - class TaskRunner(context: ExecutorContext, taskId: Long, serializedTask: ByteBuffer) + class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) extends Runnable { override def run() { diff --git a/core/src/main/scala/spark/executor/ExecutorContext.scala b/core/src/main/scala/spark/executor/ExecutorBackend.scala similarity index 59% rename from core/src/main/scala/spark/executor/ExecutorContext.scala rename to core/src/main/scala/spark/executor/ExecutorBackend.scala index 6b86d8d18a955b173852d8db66fc742aafbd8601..24c8776f31259c670b4d35aab73d3d3ba236b71d 100644 --- a/core/src/main/scala/spark/executor/ExecutorContext.scala +++ b/core/src/main/scala/spark/executor/ExecutorBackend.scala @@ -4,8 +4,8 @@ import java.nio.ByteBuffer import spark.TaskState.TaskState /** - * Interface used by Executor to send back updates to the cluster scheduler. + * A pluggable interface used by the Executor to send updates to the cluster scheduler. */ -trait ExecutorContext { +trait ExecutorBackend { def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) } diff --git a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala similarity index 92% rename from core/src/main/scala/spark/executor/MesosExecutorRunner.scala rename to core/src/main/scala/spark/executor/MesosExecutorBackend.scala index f97d9d0bfa1506ada3241b5b89eba1d54cfd6d95..50f4e41edecb8d780f9b099fac242a152fa800e2 100644 --- a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala +++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala @@ -8,9 +8,9 @@ import com.google.protobuf.ByteString import spark.{Utils, Logging} import spark.TaskState -class MesosExecutorRunner(executor: Executor) +class MesosExecutorBackend(executor: Executor) extends MesosExecutor - with ExecutorContext + with ExecutorBackend with Logging { var driver: ExecutorDriver = null @@ -59,11 +59,11 @@ class MesosExecutorRunner(executor: Executor) /** * Entry point for Mesos executor. */ -object MesosExecutorRunner { +object MesosExecutorBackend { def main(args: Array[String]) { MesosNativeLibrary.load() // Create a new Executor and start it running - val runner = new MesosExecutorRunner(new Executor) + val runner = new MesosExecutorBackend(new Executor) new MesosExecutorDriver(runner).run() } } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index c9b0c4e9b60532909d8e9e4bb2f414cb42f35d1e..7f1664b483739fcf590bef1d537956890a00d872 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -50,7 +50,7 @@ class ClusterScheduler(sc: SparkContext) // Listener object to pass upcalls into var listener: TaskSchedulerListener = null - var schedContext: ClusterSchedulerContext = null + var backend: SchedulerBackend = null val mapOutputTracker = SparkEnv.get.mapOutputTracker @@ -58,15 +58,15 @@ class ClusterScheduler(sc: SparkContext) this.listener = listener } - def initialize(context: ClusterSchedulerContext) { - schedContext = context + def initialize(context: SchedulerBackend) { + backend = context createJarServer() } def newTaskId(): Long = nextTaskId.getAndIncrement() override def start() { - schedContext.start() + backend.start() if (System.getProperty("spark.speculation", "false") == "true") { new Thread("ClusterScheduler speculation check") { @@ -95,7 +95,7 @@ class ClusterScheduler(sc: SparkContext) activeTaskSetsQueue += manager taskSetTaskIds(taskSet.id) = new HashSet[Long]() } - schedContext.reviveOffers() + backend.reviveOffers() } def taskSetFinished(manager: TaskSetManager) { @@ -197,11 +197,11 @@ class ClusterScheduler(sc: SparkContext) } if (failedHost != None) { listener.hostLost(failedHost.get) - schedContext.reviveOffers() + backend.reviveOffers() } if (taskFailed) { // Also revive offers if a task had failed for some reason other than host lost - schedContext.reviveOffers() + backend.reviveOffers() } } @@ -227,15 +227,15 @@ class ClusterScheduler(sc: SparkContext) } override def stop() { - if (schedContext != null) { - schedContext.stop() + if (backend != null) { + backend.stop() } if (jarServer != null) { jarServer.stop() } } - override def defaultParallelism() = schedContext.defaultParallelism() + override def defaultParallelism() = backend.defaultParallelism() // Create a server for all the JARs added by the user to SparkContext. // We first copy the JARs to a temp directory for easier server setup. @@ -271,7 +271,7 @@ class ClusterScheduler(sc: SparkContext) } } if (shouldRevive) { - schedContext.reviveOffers() + backend.reviveOffers() } } @@ -288,7 +288,7 @@ class ClusterScheduler(sc: SparkContext) } if (failedHost != None) { listener.hostLost(failedHost.get) - schedContext.reviveOffers() + backend.reviveOffers() } } } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala deleted file mode 100644 index 6b9687ac25c3c99039a89f6c6bc76e34413c69b7..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala +++ /dev/null @@ -1,10 +0,0 @@ -package spark.scheduler.cluster - -trait ClusterSchedulerContext { - def start(): Unit - def stop(): Unit - def reviveOffers(): Unit - def defaultParallelism(): Int - - // TODO: Probably want to add a killTask too -} diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala new file mode 100644 index 0000000000000000000000000000000000000000..897976c3f9fe979d872f0f3d9262369c7c199f61 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala @@ -0,0 +1,15 @@ +package spark.scheduler.cluster + +/** + * A backend interface for cluster scheduling systems that allows plugging in different ones under + * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as + * machines become available and can launch tasks on them. + */ +trait SchedulerBackend { + def start(): Unit + def stop(): Unit + def reviveOffers(): Unit + def defaultParallelism(): Int + + // TODO: Probably want to add a killTask too +} diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala index fad62f96aa61da0eb1b3b4e83d57dc3749d6fe7e..160977372d63af359d2c56014a45717fcd3f0479 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala @@ -1,5 +1,15 @@ package spark.scheduler.cluster +import java.nio.channels.Channels import java.nio.ByteBuffer +import java.io.{IOException, EOFException, ObjectOutputStream, ObjectInputStream} +import spark.util.SerializableByteBuffer -class TaskDescription(val taskId: Long, val name: String, val serializedTask: ByteBuffer) {} +class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer) + extends Serializable { + + // Because ByteBuffers are not serializable, we wrap the task in a SerializableByteBuffer + private val buffer = new SerializableByteBuffer(_serializedTask) + + def serializedTask: ByteBuffer = buffer.value +} diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala similarity index 98% rename from core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala rename to core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index f5c35becdaf707e36d93e93ffe9a4b06610a38fa..4e95666da11d16f5b20e6decfafb75036d3b5d74 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -15,12 +15,12 @@ import java.util.{ArrayList => JArrayList, List => JList} import java.util.Collections import spark.TaskState -class MesosScheduler( +class MesosSchedulerBackend( scheduler: ClusterScheduler, sc: SparkContext, master: String, frameworkName: String) - extends ClusterSchedulerContext + extends SchedulerBackend with MScheduler with Logging { @@ -58,11 +58,11 @@ class MesosScheduler( override def start() { synchronized { - new Thread("MesosScheduler driver") { + new Thread("MesosSchedulerBackend driver") { setDaemon(true) override def run() { - val sched = MesosScheduler.this + val sched = MesosSchedulerBackend.this val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build() driver = new MesosSchedulerDriver(sched, fwInfo, master) try { diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala new file mode 100644 index 0000000000000000000000000000000000000000..4f922a51e1de91b959122788c44c41f10bd11344 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala @@ -0,0 +1,16 @@ +package spark.scheduler.standalone + +import spark.TaskState.TaskState +import spark.scheduler.cluster.TaskDescription + +sealed trait StandaloneClusterMessage extends Serializable + +case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage +case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage + +case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: Array[Byte]) + extends StandaloneClusterMessage + +case object ReviveOffers extends StandaloneClusterMessage +case object StopMaster extends StandaloneClusterMessage + diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala new file mode 100644 index 0000000000000000000000000000000000000000..5ace6622aa79db3807f1969cd8d3d261de88ee66 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala @@ -0,0 +1,106 @@ +package spark.scheduler.standalone + +import scala.collection.mutable.{HashMap, HashSet} + +import akka.actor.{Props, Actor, ActorRef, ActorSystem} +import akka.util.duration._ +import akka.pattern.ask + +import spark.{SparkException, Logging, TaskState} +import spark.TaskState.TaskState +import spark.scheduler.cluster.{WorkerOffer, ClusterScheduler, SchedulerBackend} +import akka.dispatch.Await +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicInteger + +/** + * A standalone scheduler backend, which waits for standalone executors to connect to it through + * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained + * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*). + */ +class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) + extends SchedulerBackend + with Logging { + + // Use an atomic variable to track total number of cores in the cluster for simplicity and speed + var totalCoreCount = new AtomicInteger(0) + + class MasterActor extends Actor { + val slaveActor = new HashMap[String, ActorRef] + val slaveHost = new HashMap[String, String] + val freeCores = new HashMap[String, Int] + + def receive = { + case RegisterSlave(slaveId, host, cores) => + slaveActor(slaveId) = sender + logInfo("Registered slave: " + sender + " with ID " + slaveId) + slaveHost(slaveId) = host + freeCores(slaveId) = cores + totalCoreCount.addAndGet(cores) + makeOffers() + + case StatusUpdate(slaveId, taskId, state, data) => + scheduler.statusUpdate(taskId, state, ByteBuffer.wrap(data)) + if (TaskState.isFinished(state)) { + freeCores(slaveId) += 1 + makeOffers(slaveId) + } + + case LaunchTask(slaveId, task) => + freeCores(slaveId) -= 1 + slaveActor(slaveId) ! LaunchTask(slaveId, task) + + case ReviveOffers => + makeOffers() + + case StopMaster => + sender ! true + context.stop(self) + + // TODO: Deal with nodes disconnecting too! (Including decreasing totalCoreCount) + } + + // Make fake resource offers on all slaves + def makeOffers() { + scheduler.resourceOffers( + slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}) + } + + // Make fake resource offers on just one slave + def makeOffers(slaveId: String) { + scheduler.resourceOffers( + Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId)))) + } + } + + var masterActor: ActorRef = null + val taskIdsOnSlave = new HashMap[String, HashSet[String]] + + def start() { + masterActor = actorSystem.actorOf( + Props(new MasterActor), name = StandaloneSchedulerBackend.ACTOR_NAME) + } + + def stop() { + try { + if (masterActor != null) { + val timeout = 5.seconds + val future = masterActor.ask(StopMaster)(timeout) + Await.result(future, timeout) + } + } catch { + case e: Exception => + throw new SparkException("Error stopping standalone scheduler master actor", e) + } + } + + def reviveOffers() { + masterActor ! ReviveOffers + } + + def defaultParallelism(): Int = totalCoreCount.get() +} + +object StandaloneSchedulerBackend { + val ACTOR_NAME = "StandaloneScheduler" +} diff --git a/core/src/main/scala/spark/util/SerializableByteBuffer.scala b/core/src/main/scala/spark/util/SerializableByteBuffer.scala new file mode 100644 index 0000000000000000000000000000000000000000..f7c8112346b48895f0216ea49b9dcfd3b4556f8c --- /dev/null +++ b/core/src/main/scala/spark/util/SerializableByteBuffer.scala @@ -0,0 +1,35 @@ +package spark.util + +import java.nio.ByteBuffer +import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream} +import java.nio.channels.Channels + +/** + * A wrapper around java.nio.ByteBuffer to make it serializable through Java serialization. + */ +class SerializableByteBuffer(@transient var buffer: ByteBuffer) { + def value = buffer + + private def readObject(in: ObjectInputStream) { + val length = in.readInt() + buffer = ByteBuffer.allocate(length) + var amountRead = 0 + val channel = Channels.newChannel(in) + while (amountRead < length) { + val ret = channel.read(buffer) + if (ret == -1) { + throw new EOFException("End of file before fully reading buffer") + } + amountRead += ret + } + buffer.rewind() // Allow us to read it later + } + + private def writeObject(out: ObjectOutputStream) { + out.writeInt(buffer.limit()) + if (Channels.newChannel(out).write(buffer) != buffer.limit()) { + throw new IOException("Could not fully write buffer to output stream") + } + buffer.rewind() // Allow us to write it again later + } +} diff --git a/spark-executor b/spark-executor index 2d6934f7da53134e963e9f703b3b0ddd778e7bdc..b66c374ca865fddfc8197d711de4acae3a8c7b47 100755 --- a/spark-executor +++ b/spark-executor @@ -1,4 +1,4 @@ #!/bin/sh FWDIR="`dirname $0`" echo "Running spark-executor with framework dir = $FWDIR" -exec $FWDIR/run spark.executor.MesosExecutorRunner +exec $FWDIR/run spark.executor.MesosExecutorBackend