From 1f4a648d4e30e837d6cf3ea8de1808e2254ad70b Mon Sep 17 00:00:00 2001
From: Sandy Ryza <sandy@cloudera.com>
Date: Wed, 10 Sep 2014 14:34:24 -0500
Subject: [PATCH] SPARK-1713. Use a thread pool for launching executors.

This patch copies the approach used in the MapReduce application master for launching containers.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #663 from sryza/sandy-spark-1713 and squashes the following commits:

036550d [Sandy Ryza] SPARK-1713. [YARN] Use a threadpool for launching executor containers
---
 docs/running-on-yarn.md                            |  7 +++++++
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 14 ++++++++++++--
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 943f06b114..d8b22f3663 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -125,6 +125,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
      the environment of the executor launcher. 
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.containerLauncherMaxThreads</code></td>
+  <td>25</td>
+  <td>
+    The maximum number of threads to use in the application master for launching executor containers.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 02b9a81bf6..0b8744f4b8 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import java.util.{List => JList}
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
@@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
 import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
 object AllocationType extends Enumeration {
   type AllocationType = Value
   val HOST, RACK, ANY = Value
@@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator(
   protected val (preferredHostToCount, preferredRackToCount) =
     generateNodeToWeight(conf, preferredNodes)
 
+  private val launcherPool = new ThreadPoolExecutor(
+    // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
+    sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
+    1, TimeUnit.MINUTES,
+    new LinkedBlockingQueue[Runnable](),
+    new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
+  launcherPool.allowCoreThreadTimeOut(true)
+
   def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
 
   def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
@@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator(
             executorMemory,
             executorCores,
             securityMgr)
-          new Thread(executorRunnable).start()
+          launcherPool.execute(executorRunnable)
         }
       }
       logDebug("""
-- 
GitLab