diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 943f06b114cb92b6695fa2718882031aa53c0a0f..d8b22f3663d08de0e6631347d30a5e7832aca43f 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -125,6 +125,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
      the environment of the executor launcher. 
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.containerLauncherMaxThreads</code></td>
+  <td>25</td>
+  <td>
+    The maximum number of threads to use in the application master for launching executor containers.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 02b9a81bf6b502a4c29688398ec06bc6ebdeb027..0b8744f4b8bdf420726615996a927b8e17edb36b 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import java.util.{List => JList}
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
@@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
 import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
 object AllocationType extends Enumeration {
   type AllocationType = Value
   val HOST, RACK, ANY = Value
@@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator(
   protected val (preferredHostToCount, preferredRackToCount) =
     generateNodeToWeight(conf, preferredNodes)
 
+  private val launcherPool = new ThreadPoolExecutor(
+    // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
+    sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
+    1, TimeUnit.MINUTES,
+    new LinkedBlockingQueue[Runnable](),
+    new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
+  launcherPool.allowCoreThreadTimeOut(true)
+
   def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
 
   def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
@@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator(
             executorMemory,
             executorCores,
             securityMgr)
-          new Thread(executorRunnable).start()
+          launcherPool.execute(executorRunnable)
         }
       }
       logDebug("""