From 909b325243ebedfc1bd47fc3d7f70cde178508fc Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Fri, 6 Jul 2012 17:56:44 -0700
Subject: [PATCH] Further refactoring, and start of a standalone scheduler
 backend

---
 core/src/main/scala/spark/SparkContext.scala  |   6 +-
 .../main/scala/spark/executor/Executor.scala  |   4 +-
 ...torContext.scala => ExecutorBackend.scala} |   4 +-
 ...unner.scala => MesosExecutorBackend.scala} |   8 +-
 .../scheduler/cluster/ClusterScheduler.scala  |  24 ++--
 .../cluster/ClusterSchedulerContext.scala     |  10 --
 .../scheduler/cluster/SchedulerBackend.scala  |  15 +++
 .../scheduler/cluster/TaskDescription.scala   |  12 +-
 ...uler.scala => MesosSchedulerBackend.scala} |   8 +-
 .../standalone/StandaloneClusterMessage.scala |  16 +++
 .../StandaloneSchedulerBackend.scala          | 106 ++++++++++++++++++
 .../spark/util/SerializableByteBuffer.scala   |  35 ++++++
 spark-executor                                |   2 +-
 13 files changed, 211 insertions(+), 39 deletions(-)
 rename core/src/main/scala/spark/executor/{ExecutorContext.scala => ExecutorBackend.scala} (59%)
 rename core/src/main/scala/spark/executor/{MesosExecutorRunner.scala => MesosExecutorBackend.scala} (92%)
 delete mode 100644 core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala
 create mode 100644 core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
 rename core/src/main/scala/spark/scheduler/mesos/{MesosScheduler.scala => MesosSchedulerBackend.scala} (98%)
 create mode 100644 core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala
 create mode 100644 core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala
 create mode 100644 core/src/main/scala/spark/util/SerializableByteBuffer.scala

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 2e8cb609b2..8a06642426 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -42,7 +42,7 @@ import spark.scheduler.DAGScheduler
 import spark.scheduler.TaskScheduler
 import spark.scheduler.local.LocalScheduler
 import spark.scheduler.cluster.ClusterScheduler
-import spark.scheduler.mesos.MesosScheduler
+import spark.scheduler.mesos.MesosSchedulerBackend
 import spark.storage.BlockManagerMaster
 
 class SparkContext(
@@ -90,14 +90,14 @@ class SparkContext(
       case _ =>
         MesosNativeLibrary.load()
         val sched = new ClusterScheduler(this)
-        val schedContext = new MesosScheduler(sched, this, master, frameworkName)
+        val schedContext = new MesosSchedulerBackend(sched, this, master, frameworkName)
         sched.initialize(schedContext)
         sched
         /*
         if (System.getProperty("spark.mesos.coarse", "false") == "true") {
           new CoarseMesosScheduler(this, master, frameworkName)
         } else {
-          new MesosScheduler(this, master, frameworkName)
+          new MesosSchedulerBackend(this, master, frameworkName)
         }
         */
     }
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index ac30ae9aec..e3958cec51 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -47,11 +47,11 @@ class Executor extends Logging {
       1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
   }
 
-  def launchTask(context: ExecutorContext, taskId: Long, serializedTask: ByteBuffer) {
+  def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
     threadPool.execute(new TaskRunner(context, taskId, serializedTask))
   }
 
-  class TaskRunner(context: ExecutorContext, taskId: Long, serializedTask: ByteBuffer)
+  class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
     extends Runnable {
 
     override def run() {
diff --git a/core/src/main/scala/spark/executor/ExecutorContext.scala b/core/src/main/scala/spark/executor/ExecutorBackend.scala
similarity index 59%
rename from core/src/main/scala/spark/executor/ExecutorContext.scala
rename to core/src/main/scala/spark/executor/ExecutorBackend.scala
index 6b86d8d18a..24c8776f31 100644
--- a/core/src/main/scala/spark/executor/ExecutorContext.scala
+++ b/core/src/main/scala/spark/executor/ExecutorBackend.scala
@@ -4,8 +4,8 @@ import java.nio.ByteBuffer
 import spark.TaskState.TaskState
 
 /**
- * Interface used by Executor to send back updates to the cluster scheduler.
+ * A pluggable interface used by the Executor to send updates to the cluster scheduler.
  */
-trait ExecutorContext {
+trait ExecutorBackend {
   def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
 }
diff --git a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
similarity index 92%
rename from core/src/main/scala/spark/executor/MesosExecutorRunner.scala
rename to core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index f97d9d0bfa..50f4e41ede 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorRunner.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -8,9 +8,9 @@ import com.google.protobuf.ByteString
 import spark.{Utils, Logging}
 import spark.TaskState
 
-class MesosExecutorRunner(executor: Executor)
+class MesosExecutorBackend(executor: Executor)
   extends MesosExecutor
-  with ExecutorContext
+  with ExecutorBackend
   with Logging {
 
   var driver: ExecutorDriver = null
@@ -59,11 +59,11 @@ class MesosExecutorRunner(executor: Executor)
 /**
  * Entry point for Mesos executor.
  */
-object MesosExecutorRunner {
+object MesosExecutorBackend {
   def main(args: Array[String]) {
     MesosNativeLibrary.load()
     // Create a new Executor and start it running
-    val runner = new MesosExecutorRunner(new Executor)
+    val runner = new MesosExecutorBackend(new Executor)
     new MesosExecutorDriver(runner).run()
   }
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index c9b0c4e9b6..7f1664b483 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -50,7 +50,7 @@ class ClusterScheduler(sc: SparkContext)
   // Listener object to pass upcalls into
   var listener: TaskSchedulerListener = null
 
-  var schedContext: ClusterSchedulerContext = null
+  var backend: SchedulerBackend = null
 
   val mapOutputTracker = SparkEnv.get.mapOutputTracker
 
@@ -58,15 +58,15 @@ class ClusterScheduler(sc: SparkContext)
     this.listener = listener
   }
 
-  def initialize(context: ClusterSchedulerContext) {
-    schedContext = context
+  def initialize(context: SchedulerBackend) {
+    backend = context
     createJarServer()
   }
 
   def newTaskId(): Long = nextTaskId.getAndIncrement()
 
   override def start() {
-    schedContext.start()
+    backend.start()
 
     if (System.getProperty("spark.speculation", "false") == "true") {
       new Thread("ClusterScheduler speculation check") {
@@ -95,7 +95,7 @@ class ClusterScheduler(sc: SparkContext)
       activeTaskSetsQueue += manager
       taskSetTaskIds(taskSet.id) = new HashSet[Long]()
     }
-    schedContext.reviveOffers()
+    backend.reviveOffers()
   }
 
   def taskSetFinished(manager: TaskSetManager) {
@@ -197,11 +197,11 @@ class ClusterScheduler(sc: SparkContext)
     }
     if (failedHost != None) {
       listener.hostLost(failedHost.get)
-      schedContext.reviveOffers()
+      backend.reviveOffers()
     }
     if (taskFailed) {
       // Also revive offers if a task had failed for some reason other than host lost
-      schedContext.reviveOffers()
+      backend.reviveOffers()
     }
   }
 
@@ -227,15 +227,15 @@ class ClusterScheduler(sc: SparkContext)
   }
 
   override def stop() {
-    if (schedContext != null) {
-      schedContext.stop()
+    if (backend != null) {
+      backend.stop()
     }
     if (jarServer != null) {
       jarServer.stop()
     }
   }
 
-  override def defaultParallelism() = schedContext.defaultParallelism()
+  override def defaultParallelism() = backend.defaultParallelism()
 
   // Create a server for all the JARs added by the user to SparkContext.
   // We first copy the JARs to a temp directory for easier server setup.
@@ -271,7 +271,7 @@ class ClusterScheduler(sc: SparkContext)
       }
     }
     if (shouldRevive) {
-      schedContext.reviveOffers()
+      backend.reviveOffers()
     }
   }
 
@@ -288,7 +288,7 @@ class ClusterScheduler(sc: SparkContext)
     }
     if (failedHost != None) {
       listener.hostLost(failedHost.get)
-      schedContext.reviveOffers()
+      backend.reviveOffers()
     }
   }
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala
deleted file mode 100644
index 6b9687ac25..0000000000
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterSchedulerContext.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package spark.scheduler.cluster
-
-trait ClusterSchedulerContext {
-  def start(): Unit
-  def stop(): Unit
-  def reviveOffers(): Unit
-  def defaultParallelism(): Int
-
-  // TODO: Probably want to add a killTask too
-}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
new file mode 100644
index 0000000000..897976c3f9
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
@@ -0,0 +1,15 @@
+package spark.scheduler.cluster
+
+/**
+ * A backend interface for cluster scheduling systems that allows plugging in different ones under
+ * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
+ * machines become available and can launch tasks on them.
+ */
+trait SchedulerBackend {
+  def start(): Unit
+  def stop(): Unit
+  def reviveOffers(): Unit
+  def defaultParallelism(): Int
+
+  // TODO: Probably want to add a killTask too
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index fad62f96aa..160977372d 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -1,5 +1,15 @@
 package spark.scheduler.cluster
 
+import java.nio.channels.Channels
 import java.nio.ByteBuffer
+import java.io.{IOException, EOFException, ObjectOutputStream, ObjectInputStream}
+import spark.util.SerializableByteBuffer
 
-class TaskDescription(val taskId: Long, val name: String, val serializedTask: ByteBuffer) {}
+class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer)
+  extends Serializable {
+
+  // Because ByteBuffers are not serializable, we wrap the task in a SerializableByteBuffer
+  private val buffer = new SerializableByteBuffer(_serializedTask)
+
+  def serializedTask: ByteBuffer = buffer.value
+}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
similarity index 98%
rename from core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
rename to core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index f5c35becda..4e95666da1 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -15,12 +15,12 @@ import java.util.{ArrayList => JArrayList, List => JList}
 import java.util.Collections
 import spark.TaskState
 
-class MesosScheduler(
+class MesosSchedulerBackend(
     scheduler: ClusterScheduler,
     sc: SparkContext,
     master: String,
     frameworkName: String)
-  extends ClusterSchedulerContext
+  extends SchedulerBackend
   with MScheduler
   with Logging {
 
@@ -58,11 +58,11 @@ class MesosScheduler(
 
   override def start() {
     synchronized {
-      new Thread("MesosScheduler driver") {
+      new Thread("MesosSchedulerBackend driver") {
         setDaemon(true)
 
         override def run() {
-          val sched = MesosScheduler.this
+          val sched = MesosSchedulerBackend.this
           val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
           driver = new MesosSchedulerDriver(sched, fwInfo, master)
           try {
diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala
new file mode 100644
index 0000000000..4f922a51e1
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/standalone/StandaloneClusterMessage.scala
@@ -0,0 +1,16 @@
+package spark.scheduler.standalone
+
+import spark.TaskState.TaskState
+import spark.scheduler.cluster.TaskDescription
+
+sealed trait StandaloneClusterMessage extends Serializable
+
+case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
+case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage
+
+case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: Array[Byte])
+  extends StandaloneClusterMessage
+
+case object ReviveOffers extends StandaloneClusterMessage
+case object StopMaster extends StandaloneClusterMessage
+
diff --git a/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala
new file mode 100644
index 0000000000..5ace6622aa
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/standalone/StandaloneSchedulerBackend.scala
@@ -0,0 +1,106 @@
+package spark.scheduler.standalone
+
+import scala.collection.mutable.{HashMap, HashSet}
+
+import akka.actor.{Props, Actor, ActorRef, ActorSystem}
+import akka.util.duration._
+import akka.pattern.ask
+
+import spark.{SparkException, Logging, TaskState}
+import spark.TaskState.TaskState
+import spark.scheduler.cluster.{WorkerOffer, ClusterScheduler, SchedulerBackend}
+import akka.dispatch.Await
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * A standalone scheduler backend, which waits for standalone executors to connect to it through
+ * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
+ * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
+ */
+class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+  extends SchedulerBackend
+  with Logging {
+
+  // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
+  var totalCoreCount = new AtomicInteger(0)
+
+  class MasterActor extends Actor {
+    val slaveActor = new HashMap[String, ActorRef]
+    val slaveHost = new HashMap[String, String]
+    val freeCores = new HashMap[String, Int]
+
+    def receive = {
+      case RegisterSlave(slaveId, host, cores) =>
+        slaveActor(slaveId) = sender
+        logInfo("Registered slave: " + sender + " with ID " + slaveId)
+        slaveHost(slaveId) = host
+        freeCores(slaveId) = cores
+        totalCoreCount.addAndGet(cores)
+        makeOffers()
+
+      case StatusUpdate(slaveId, taskId, state, data) =>
+        scheduler.statusUpdate(taskId, state, ByteBuffer.wrap(data))
+        if (TaskState.isFinished(state)) {
+          freeCores(slaveId) += 1
+          makeOffers(slaveId)
+        }
+
+      case LaunchTask(slaveId, task) =>
+        freeCores(slaveId) -= 1
+        slaveActor(slaveId) ! LaunchTask(slaveId, task)
+
+      case ReviveOffers =>
+        makeOffers()
+
+      case StopMaster =>
+        sender ! true
+        context.stop(self)
+
+      // TODO: Deal with nodes disconnecting too! (Including decreasing totalCoreCount)
+    }
+
+    // Make fake resource offers on all slaves
+    def makeOffers() {
+      scheduler.resourceOffers(
+        slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})
+    }
+
+    // Make fake resource offers on just one slave
+    def makeOffers(slaveId: String) {
+      scheduler.resourceOffers(
+        Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId))))
+    }
+  }
+
+  var masterActor: ActorRef = null
+  val taskIdsOnSlave = new HashMap[String, HashSet[String]]
+
+  def start() {
+    masterActor = actorSystem.actorOf(
+      Props(new MasterActor), name = StandaloneSchedulerBackend.ACTOR_NAME)
+  }
+
+  def stop() {
+    try {
+      if (masterActor != null) {
+        val timeout = 5.seconds
+        val future = masterActor.ask(StopMaster)(timeout)
+        Await.result(future, timeout)
+      }
+    } catch {
+      case e: Exception =>
+        throw new SparkException("Error stopping standalone scheduler master actor", e)
+    }
+  }
+
+  def reviveOffers() {
+    masterActor ! ReviveOffers
+  }
+
+  def defaultParallelism(): Int = totalCoreCount.get()
+}
+
+object StandaloneSchedulerBackend {
+  val ACTOR_NAME = "StandaloneScheduler"
+}
diff --git a/core/src/main/scala/spark/util/SerializableByteBuffer.scala b/core/src/main/scala/spark/util/SerializableByteBuffer.scala
new file mode 100644
index 0000000000..f7c8112346
--- /dev/null
+++ b/core/src/main/scala/spark/util/SerializableByteBuffer.scala
@@ -0,0 +1,35 @@
+package spark.util
+
+import java.nio.ByteBuffer
+import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream}
+import java.nio.channels.Channels
+
+/**
+ * A wrapper around java.nio.ByteBuffer to make it serializable through Java serialization.
+ */
+class SerializableByteBuffer(@transient var buffer: ByteBuffer) {
+  def value = buffer
+
+  private def readObject(in: ObjectInputStream) {
+    val length = in.readInt()
+    buffer = ByteBuffer.allocate(length)
+    var amountRead = 0
+    val channel = Channels.newChannel(in)
+    while (amountRead < length) {
+      val ret = channel.read(buffer)
+      if (ret == -1) {
+        throw new EOFException("End of file before fully reading buffer")
+      }
+      amountRead += ret
+    }
+    buffer.rewind() // Allow us to read it later
+  }
+
+  private def writeObject(out: ObjectOutputStream) {
+    out.writeInt(buffer.limit())
+    if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
+      throw new IOException("Could not fully write buffer to output stream")
+    }
+    buffer.rewind() // Allow us to write it again later
+  }
+}
diff --git a/spark-executor b/spark-executor
index 2d6934f7da..b66c374ca8 100755
--- a/spark-executor
+++ b/spark-executor
@@ -1,4 +1,4 @@
 #!/bin/sh
 FWDIR="`dirname $0`"
 echo "Running spark-executor with framework dir = $FWDIR"
-exec $FWDIR/run spark.executor.MesosExecutorRunner
+exec $FWDIR/run spark.executor.MesosExecutorBackend
-- 
GitLab