Skip to content
Snippets Groups Projects
Commit a3da5088 authored by CodingCat's avatar CodingCat Committed by Kay Ousterhout
Browse files

SPARK-1171: when executor is removed, we should minus totalCores instead of...

SPARK-1171: when executor is removed, we should minus totalCores instead of just freeCores on that executor

https://spark-project.atlassian.net/browse/SPARK-1171

When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores...

Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>

Closes #63 from CodingCat/simplify_CoarseGrainedSchedulerBackend and squashes the following commits:

f6bf93f [Nan Zhu] code clean
19c2bb4 [CodingCat] use copy idiom to reconstruct the workerOffers
43c13e9 [CodingCat] keep WorkerOffer immutable
af470d3 [CodingCat] style fix
0c0e409 [CodingCat] simplify the implementation of CoarseGrainedSchedulerBackend
parent 02836657
No related branches found
No related tags found
No related merge requests found
...@@ -21,4 +21,4 @@ package org.apache.spark.scheduler ...@@ -21,4 +21,4 @@ package org.apache.spark.scheduler
* Represents free resources available on an executor. * Represents free resources available on an executor.
*/ */
private[spark] private[spark]
class WorkerOffer(val executorId: String, val host: String, val cores: Int) case class WorkerOffer(executorId: String, host: String, cores: Int)
...@@ -54,6 +54,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A ...@@ -54,6 +54,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
private val executorAddress = new HashMap[String, Address] private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String] private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int] private val freeCores = new HashMap[String, Int]
private val totalCores = new HashMap[String, Int]
private val addressToExecutorId = new HashMap[Address, String] private val addressToExecutorId = new HashMap[Address, String]
override def preStart() { override def preStart() {
...@@ -76,6 +77,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A ...@@ -76,6 +77,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
sender ! RegisteredExecutor(sparkProperties) sender ! RegisteredExecutor(sparkProperties)
executorActor(executorId) = sender executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1 executorHost(executorId) = Utils.parseHostPort(hostPort)._1
totalCores(executorId) = cores
freeCores(executorId) = cores freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId addressToExecutorId(sender.path.address) = executorId
...@@ -147,10 +149,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A ...@@ -147,10 +149,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
def removeExecutor(executorId: String, reason: String) { def removeExecutor(executorId: String, reason: String) {
if (executorActor.contains(executorId)) { if (executorActor.contains(executorId)) {
logInfo("Executor " + executorId + " disconnected, so removing it") logInfo("Executor " + executorId + " disconnected, so removing it")
val numCores = freeCores(executorId) val numCores = totalCores(executorId)
addressToExecutorId -= executorAddress(executorId)
executorActor -= executorId executorActor -= executorId
executorHost -= executorId executorHost -= executorId
addressToExecutorId -= executorAddress(executorId)
executorAddress -= executorId
totalCores -= executorId
freeCores -= executorId freeCores -= executorId
totalCoreCount.addAndGet(-numCores) totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason)) scheduler.executorLost(executorId, SlaveLost(reason))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment