From 6607f546ccadf307b0a862f1b52ab0b12316420d Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Thu, 8 Nov 2012 23:13:12 -0800
Subject: [PATCH] Added an option to spread out jobs in the standalone mode.

---
 .../scala/spark/deploy/master/Master.scala    | 63 +++++++++++++++----
 .../spark/deploy/master/WorkerInfo.scala      |  4 ++
 .../spark/deploy/master/job_row.scala.html    |  7 +--
 3 files changed, 56 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 5ef7411f4d..7e5cd6b171 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
   val waitingJobs = new ArrayBuffer[JobInfo]
   val completedJobs = new ArrayBuffer[JobInfo]
 
+  // As a temporary workaround before better ways of configuring memory, we allow users to set
+  // a flag that will perform round-robin scheduling across the nodes (spreading out each job
+  // among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
+  val spreadOutJobs = System.getProperty("spark.deploy.spreadOut", "false").toBoolean
+
   override def preStart() {
     logInfo("Starting Spark master at spark://" + ip + ":" + port)
     // Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -127,24 +132,58 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
     }
   }
 
+  /**
+   * Can a job use the given worker? True if the worker has enough memory and we haven't already
+   * launched an executor for the job on it (right now the standalone backend doesn't like having
+   * two executors on the same worker).
+   */
+  def canUse(job: JobInfo, worker: WorkerInfo): Boolean = {
+    worker.memoryFree >= job.desc.memoryPerSlave && !worker.hasExecutor(job)
+  }
+
   /**
    * Schedule the currently available resources among waiting jobs. This method will be called
    * every time a new job joins or resource availability changes.
    */
   def schedule() {
-    // Right now this is a very simple FIFO scheduler. We keep looking through the jobs
-    // in order of submission time and launching the first one that fits on each node.
-    for (worker <- workers if worker.coresFree > 0) {
-      for (job <- waitingJobs.clone()) {
-        val jobMemory = job.desc.memoryPerSlave
-        if (worker.memoryFree >= jobMemory) {
-          val coresToUse = math.min(worker.coresFree, job.coresLeft)
-          val exec = job.addExecutor(worker, coresToUse)
-          launchExecutor(worker, exec)
+    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first job
+    // in the queue, then the second job, etc.
+    if (spreadOutJobs) {
+      // Try to spread out each job among all the nodes, until it has all its cores
+      for (job <- waitingJobs if job.coresLeft > 0) {
+        val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse
+        val numUsable = usableWorkers.length
+        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
+        var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
+        var pos = 0
+        while (toAssign > 0) {
+          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
+            toAssign -= 1
+            assigned(pos) += 1
+          }
+          pos = (pos + 1) % numUsable
         }
-        if (job.coresLeft == 0) {
-          waitingJobs -= job
-          job.state = JobState.RUNNING
+        // Now that we've decided how many cores to give on each node, let's actually give them
+        for (pos <- 0 until numUsable) {
+          if (assigned(pos) > 0) {
+            val exec = job.addExecutor(usableWorkers(pos), assigned(pos))
+            launchExecutor(usableWorkers(pos), exec)
+            job.state = JobState.RUNNING
+          }
+        }
+      }
+    } else {
+      // Pack each job into as few nodes as possible until we've assigned all its cores
+      for (worker <- workers if worker.coresFree > 0) {
+        for (job <- waitingJobs if job.coresLeft > 0) {
+          if (canUse(job, worker)) {
+            val coresToUse = math.min(worker.coresFree, job.coresLeft)
+            if (coresToUse > 0) {
+              val exec = job.addExecutor(worker, coresToUse)
+              launchExecutor(worker, exec)
+              job.state = JobState.RUNNING
+            }
+          }
         }
       }
     }
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 16b3f9b653..706b1453aa 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -33,6 +33,10 @@ private[spark] class WorkerInfo(
       memoryUsed -= exec.memory
     }
   }
+
+  def hasExecutor(job: JobInfo): Boolean = {
+    executors.values.exists(_.job == job)
+  }
   
   def webUiAddress : String = {
     "http://" + this.host + ":" + this.webUiPort
diff --git a/core/src/main/twirl/spark/deploy/master/job_row.scala.html b/core/src/main/twirl/spark/deploy/master/job_row.scala.html
index fff7953e7d..7c466a6a2c 100644
--- a/core/src/main/twirl/spark/deploy/master/job_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/job_row.scala.html
@@ -10,12 +10,7 @@
   </td>
   <td>@job.desc.name</td>
   <td>
-    @job.coresGranted Granted
-    @if(job.desc.cores == Integer.MAX_VALUE) {
-      
-    } else {
-      , @job.coresLeft
-    }
+    @job.coresGranted
   </td>
   <td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td>
   <td>@formatDate(job.submitDate)</td>
-- 
GitLab