From aa40c4420717aa06a7964bd30b428fb73548beb2 Mon Sep 17 00:00:00 2001
From: Marcelo Vanzin <vanzin@cloudera.com>
Date: Wed, 3 Jun 2015 14:59:30 -0700
Subject: [PATCH] [SPARK-8059] [YARN] Wake up allocation thread when new
 requests arrive.

This should help reduce latency for new executor allocations.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #6600 from vanzin/SPARK-8059 and squashes the following commits:

8387a3a [Marcelo Vanzin] [SPARK-8059] [yarn] Wake up allocation thread when new requests arrive.
---
 .../spark/deploy/yarn/ApplicationMaster.scala    | 16 +++++++++++++---
 .../apache/spark/deploy/yarn/YarnAllocator.scala |  7 ++++++-
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 760e458972..002d7b6eaf 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -67,6 +67,7 @@ private[spark] class ApplicationMaster(
 
   @volatile private var reporterThread: Thread = _
   @volatile private var allocator: YarnAllocator = _
+  private val allocatorLock = new Object()
 
   // Fields used in client mode.
   private var rpcEnv: RpcEnv = null
@@ -359,7 +360,9 @@ private[spark] class ApplicationMaster(
               }
             logDebug(s"Number of pending allocations is $numPendingAllocate. " +
                      s"Sleeping for $sleepInterval.")
-            Thread.sleep(sleepInterval)
+            allocatorLock.synchronized {
+              allocatorLock.wait(sleepInterval)
+            }
           } catch {
             case e: InterruptedException =>
           }
@@ -546,8 +549,15 @@ private[spark] class ApplicationMaster(
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
       case RequestExecutors(requestedTotal) =>
         Option(allocator) match {
-          case Some(a) => a.requestTotalExecutors(requestedTotal)
-          case None => logWarning("Container allocator is not ready to request executors yet.")
+          case Some(a) =>
+            allocatorLock.synchronized {
+              if (a.requestTotalExecutors(requestedTotal)) {
+                allocatorLock.notifyAll()
+              }
+            }
+
+          case None =>
+            logWarning("Container allocator is not ready to request executors yet.")
         }
         context.reply(true)
 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 21193e7c62..940873fbd0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -146,11 +146,16 @@ private[yarn] class YarnAllocator(
    * Request as many executors from the ResourceManager as needed to reach the desired total. If
    * the requested total is smaller than the current number of running executors, no executors will
    * be killed.
+   *
+   * @return Whether the new requested total is different than the old value.
    */
-  def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+  def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized {
     if (requestedTotal != targetNumExecutors) {
       logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
       targetNumExecutors = requestedTotal
+      true
+    } else {
+      false
     }
   }
 
-- 
GitLab