Skip to content
Snippets Groups Projects
Commit 6607f546 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Added an option to spread out jobs in the standalone mode.

parent 66cbdee9
No related branches found
No related tags found
No related merge requests found
...@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo] val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo] val completedJobs = new ArrayBuffer[JobInfo]
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each job
// among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
val spreadOutJobs = System.getProperty("spark.deploy.spreadOut", "false").toBoolean
override def preStart() { override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port) logInfo("Starting Spark master at spark://" + ip + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch() // Listen for remote client disconnection events, since they don't go through Akka's watch()
...@@ -127,24 +132,58 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -127,24 +132,58 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
} }
} }
/**
* Can a job use the given worker? True if the worker has enough memory and we haven't already
* launched an executor for the job on it (right now the standalone backend doesn't like having
* two executors on the same worker).
*/
def canUse(job: JobInfo, worker: WorkerInfo): Boolean = {
worker.memoryFree >= job.desc.memoryPerSlave && !worker.hasExecutor(job)
}
/** /**
* Schedule the currently available resources among waiting jobs. This method will be called * Schedule the currently available resources among waiting jobs. This method will be called
* every time a new job joins or resource availability changes. * every time a new job joins or resource availability changes.
*/ */
def schedule() { def schedule() {
// Right now this is a very simple FIFO scheduler. We keep looking through the jobs // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first job
// in order of submission time and launching the first one that fits on each node. // in the queue, then the second job, etc.
for (worker <- workers if worker.coresFree > 0) { if (spreadOutJobs) {
for (job <- waitingJobs.clone()) { // Try to spread out each job among all the nodes, until it has all its cores
val jobMemory = job.desc.memoryPerSlave for (job <- waitingJobs if job.coresLeft > 0) {
if (worker.memoryFree >= jobMemory) { val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse
val coresToUse = math.min(worker.coresFree, job.coresLeft) val numUsable = usableWorkers.length
val exec = job.addExecutor(worker, coresToUse) val assigned = new Array[Int](numUsable) // Number of cores to give on each node
launchExecutor(worker, exec) var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
} }
if (job.coresLeft == 0) { // Now that we've decided how many cores to give on each node, let's actually give them
waitingJobs -= job for (pos <- 0 until numUsable) {
job.state = JobState.RUNNING if (assigned(pos) > 0) {
val exec = job.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec)
job.state = JobState.RUNNING
}
}
}
} else {
// Pack each job into as few nodes as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0) {
for (job <- waitingJobs if job.coresLeft > 0) {
if (canUse(job, worker)) {
val coresToUse = math.min(worker.coresFree, job.coresLeft)
if (coresToUse > 0) {
val exec = job.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
job.state = JobState.RUNNING
}
}
} }
} }
} }
......
...@@ -33,6 +33,10 @@ private[spark] class WorkerInfo( ...@@ -33,6 +33,10 @@ private[spark] class WorkerInfo(
memoryUsed -= exec.memory memoryUsed -= exec.memory
} }
} }
def hasExecutor(job: JobInfo): Boolean = {
executors.values.exists(_.job == job)
}
def webUiAddress : String = { def webUiAddress : String = {
"http://" + this.host + ":" + this.webUiPort "http://" + this.host + ":" + this.webUiPort
......
...@@ -10,12 +10,7 @@ ...@@ -10,12 +10,7 @@
</td> </td>
<td>@job.desc.name</td> <td>@job.desc.name</td>
<td> <td>
@job.coresGranted Granted @job.coresGranted
@if(job.desc.cores == Integer.MAX_VALUE) {
} else {
, @job.coresLeft
}
</td> </td>
<td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td> <td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td>
<td>@formatDate(job.submitDate)</td> <td>@formatDate(job.submitDate)</td>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment