diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 51c13391650242b4bb4fe1725bcb42037f5f0116..9d55f435e80adcdc5343b83035d7af90c58f91f2 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -71,9 +71,22 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
 </tr>
 <tr>
   <td><code>spark.yarn.scheduler.heartbeat.interval-ms</code></td>
-  <td>5000</td>
+  <td>3000</td>
   <td>
     The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager.
+    The value is capped at half the value of YARN's configuration for the expiry interval
+    (<code>yarn.am.liveness-monitor.expiry-interval-ms</code>).
+  </td>
+</tr>
+<tr>
+  <td><code>spark.yarn.scheduler.initial-allocation.interval</code></td>
+  <td>200ms</td>
+  <td>
+    The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager
+    when there are pending container allocation requests. It should be no larger than
+    <code>spark.yarn.scheduler.heartbeat.interval-ms</code>. The allocation interval will doubled on
+    successive eager heartbeats if pending containers still exist, until
+    <code>spark.yarn.scheduler.heartbeat.interval-ms</code> is reached.
   </td>
 </tr>
 <tr>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 29752969e6152a9b90803fcc7b5c87d71443536c..63a6f2e9472c1919cbd59076ccd13ed5768037d3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -300,11 +300,14 @@ private[spark] class ApplicationMaster(
     val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
 
     // we want to be reasonably responsive without causing too many requests to RM.
-    val schedulerInterval =
-      sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s")
+    val heartbeatInterval = math.max(0, math.min(expiryInterval / 2,
+      sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
 
-    // must be <= expiryInterval / 2.
-    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
+    // we want to check more frequently for pending containers
+    val initialAllocationInterval = math.min(heartbeatInterval,
+      sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
+
+    var nextAllocationInterval = initialAllocationInterval
 
     // The number of failures in a row until Reporter thread give up
     val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
@@ -330,15 +333,27 @@ private[spark] class ApplicationMaster(
               if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
                 finish(FinalApplicationStatus.FAILED,
                   ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
-                    s"${failureCount} time(s) from Reporter thread.")
-
+                    s"$failureCount time(s) from Reporter thread.")
               } else {
-                logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
+                logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
               }
             }
           }
           try {
-            Thread.sleep(interval)
+            val numPendingAllocate = allocator.getNumPendingAllocate
+            val sleepInterval =
+              if (numPendingAllocate > 0) {
+                val currentAllocationInterval =
+                  math.min(heartbeatInterval, nextAllocationInterval)
+                nextAllocationInterval *= 2
+                currentAllocationInterval
+              } else {
+                nextAllocationInterval = initialAllocationInterval
+                heartbeatInterval
+              }
+            logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+                     s"Sleeping for $sleepInterval.")
+            Thread.sleep(sleepInterval)
           } catch {
             case e: InterruptedException =>
           }
@@ -349,7 +364,8 @@ private[spark] class ApplicationMaster(
     t.setDaemon(true)
     t.setName("Reporter")
     t.start()
-    logInfo("Started progress reporter thread - sleep time : " + interval)
+    logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
+            s"initial allocation : $initialAllocationInterval) intervals")
     t
   }