From 28dbae85aaf6842e22cd7465cb11cb34d58fc56d Mon Sep 17 00:00:00 2001
From: li-zhihui <zhihui.li@intel.com>
Date: Fri, 8 Aug 2014 22:52:56 -0700
Subject: [PATCH] [SPARK-2635] Fix race condition at SchedulerBackend.isReady
 in standalone mode

In SPARK-1946(PR #900), configuration <code>spark.scheduler.minRegisteredExecutorsRatio</code> was introduced. However, in standalone mode, there is a race condition where isReady() can return true because totalExpectedExecutors has not been correctly set.

Because expected executors is uncertain in standalone mode, the PR try to use CPU cores(<code>--total-executor-cores</code>) as expected resources to judge whether SchedulerBackend is ready.

Author: li-zhihui <zhihui.li@intel.com>
Author: Li Zhihui <zhihui.li@intel.com>

Closes #1525 from li-zhihui/fixre4s and squashes the following commits:

e9a630b [Li Zhihui] Rename variable totalExecutors and clean codes
abf4860 [Li Zhihui] Push down variable totalExpectedResources to children classes
ca54bd9 [li-zhihui] Format log with String interpolation
88c7dc6 [li-zhihui] Few codes and docs refactor
41cf47e [li-zhihui] Fix race condition at SchedulerBackend.isReady in standalone mode
---
 .../CoarseGrainedSchedulerBackend.scala       | 30 +++++++++----------
 .../cluster/SparkDeploySchedulerBackend.scala |  6 +++-
 docs/configuration.md                         | 13 ++++----
 .../cluster/YarnClientSchedulerBackend.scala  |  9 ++++--
 .../cluster/YarnClusterSchedulerBackend.scala | 17 +++++++----
 5 files changed, 43 insertions(+), 32 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9f085eef46..33500d967e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalRegisteredExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected executors) 
+  // Submit tasks only after (registered resources / total expected resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
-  if (minRegisteredRatio > 1) minRegisteredRatio = 1
-  // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
+  var minRegisteredRatio =
+    math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
+  // Submit tasks after maxRegisteredWaitingTime milliseconds
+  // if minRegisteredRatio has not yet been reached  
   val maxRegisteredWaitingTime =
-    conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
+    conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
   val createTime = System.currentTimeMillis()
-  var ready = if (minRegisteredRatio <= 0) true else false
 
   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
     private val executorActor = new HashMap[String, ActorRef]
@@ -94,12 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
           executorAddress(executorId) = sender.path.address
           addressToExecutorId(sender.path.address) = executorId
           totalCoreCount.addAndGet(cores)
-          if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
-            ready = true
-            logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
-              executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
-              ", minRegisteredExecutorsRatio: " + minRegisteredRatio)
-          }
+          totalRegisteredExecutors.addAndGet(1)
           makeOffers()
         }
 
@@ -268,14 +263,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     }
   }
 
+  def sufficientResourcesRegistered(): Boolean = true
+
   override def isReady(): Boolean = {
-    if (ready) {
+    if (sufficientResourcesRegistered) {
+      logInfo("SchedulerBackend is ready for scheduling beginning after " +
+        s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
       return true
     }
     if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
-      ready = true
       logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
-        "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
+        s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
       return true
     }
     false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a28446f6c8..589dba2e40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
   var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
 
   val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
+  val totalExpectedCores = maxCores.getOrElse(0)
 
   override def start() {
     super.start()
@@ -97,7 +98,6 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
     memory: Int) {
-    totalExpectedExecutors.addAndGet(1)
     logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
       fullId, hostPort, cores, Utils.megabytesToString(memory)))
   }
@@ -110,4 +110,8 @@ private[spark] class SparkDeploySchedulerBackend(
     logInfo("Executor %s removed: %s".format(fullId, message))
     removeExecutor(fullId.split("/")(1), reason.toString)
   }
+
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
+  }
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index 4d27c5a918..617a72a021 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -825,21 +825,22 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 </tr>
-  <td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
+  <td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
   <td>0</td>
   <td>
-    The minimum ratio of registered executors (registered executors / total expected executors)
+    The minimum ratio of registered resources (registered resources / total expected resources)
+    (resources are executors in yarn mode, CPU cores in standalone mode)
     to wait for before scheduling begins. Specified as a double between 0 and 1.
-    Regardless of whether the minimum ratio of executors has been reached,
+    Regardless of whether the minimum ratio of resources has been reached,
     the maximum amount of time it will wait before scheduling begins is controlled by config 
-    <code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code> 
+    <code>spark.scheduler.maxRegisteredResourcesWaitingTime</code> 
   </td>
 </tr>
 <tr>
-  <td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
+  <td><code>spark.scheduler.maxRegisteredResourcesWaitingTime</code></td>
   <td>30000</td>
   <td>
-    Maximum amount of time to wait for executors to register before scheduling begins
+    Maximum amount of time to wait for resources to register before scheduling begins
     (in milliseconds).  
   </td>
 </tr>
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index f8fb96b312..833e249f9f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -30,15 +30,15 @@ private[spark] class YarnClientSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
   with Logging {
 
-  if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
     minRegisteredRatio = 0.8
-    ready = false
   }
 
   var client: Client = null
   var appId: ApplicationId = null
   var checkerThread: Thread = null
   var stopping: Boolean = false
+  var totalExpectedExecutors = 0
 
   private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
       arrayBuf: ArrayBuffer[String]) {
@@ -84,7 +84,7 @@ private[spark] class YarnClientSchedulerBackend(
 
     logDebug("ClientArguments called with: " + argsArrayBuf)
     val args = new ClientArguments(argsArrayBuf.toArray, conf)
-    totalExpectedExecutors.set(args.numExecutors)
+    totalExpectedExecutors = args.numExecutors
     client = new Client(args, conf)
     appId = client.runApp()
     waitForApp()
@@ -150,4 +150,7 @@ private[spark] class YarnClientSchedulerBackend(
     logInfo("Stopped")
   }
 
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
+  }
 }
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 0ad1794d19..55665220a6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -27,19 +27,24 @@ private[spark] class YarnClusterSchedulerBackend(
     sc: SparkContext)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
 
-  if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+  var totalExpectedExecutors = 0
+  
+  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
     minRegisteredRatio = 0.8
-    ready = false
   }
 
   override def start() {
     super.start()
-    var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+    totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
     if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
-      numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
+      totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
+        .getOrElse(totalExpectedExecutors)
     }
     // System property can override environment variable.
-    numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
-    totalExpectedExecutors.set(numExecutors)
+    totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
+  }
+
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
   }
 }
-- 
GitLab