diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8f0ecb855718e4d0cc504a82ed3fb2fe2fc37a39..1cc9c33cd2d02f7b744bc75d7b4b9b35231f9f8e 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -277,7 +277,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         yarnAllocator.allocateContainers(
           math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
         ApplicationMaster.incrementAllocatorLoop(1)
-        Thread.sleep(100)
+        Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
       }
     } finally {
       // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
@@ -416,6 +416,7 @@ object ApplicationMaster {
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
   private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
 
   def incrementAllocatorLoop(by: Int) {
     val count = yarnAllocatorLoop.getAndAdd(by)
@@ -467,13 +468,22 @@ object ApplicationMaster {
       })
     }
 
-    // Wait for initialization to complete and atleast 'some' nodes can get allocated.
+    modified
+  }
+
+
+  /**
+   * Returns when we've either
+   *  1) received all the requested executors,
+   *  2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
+   *  3) hit an error that causes us to terminate trying to get containers.
+   */
+  def waitForInitialAllocations() {
     yarnAllocatorLoop.synchronized {
       while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
         yarnAllocatorLoop.wait(1000L)
       }
     }
-    modified
   }
 
   def main(argStrings: Array[String]) {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 801e8b381588fecdfe21f525571a72c7812fa3cc..29a35680c0e72a246610bb7b3b04307d8836e541 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn
 
 import java.io.File
 import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
-import java.nio.ByteBuffer
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -37,7 +36,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
 import org.apache.spark.{Logging, SparkConf, SparkContext}
 
 /**
@@ -169,14 +168,13 @@ trait ClientBase extends Logging {
     destPath
   }
 
-  def qualifyForLocal(localURI: URI): Path = {
+  private def qualifyForLocal(localURI: URI): Path = {
     var qualifiedURI = localURI
-    // If not specified assume these are in the local filesystem to keep behavior like Hadoop
+    // If not specified, assume these are in the local filesystem to keep behavior like Hadoop
     if (qualifiedURI.getScheme() == null) {
       qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(qualifiedURI)).toString)
     }
-    val qualPath = new Path(qualifiedURI)
-    qualPath
+    new Path(qualifiedURI)
   }
 
   def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
@@ -305,13 +303,13 @@ trait ClientBase extends Logging {
 
     val amMemory = calculateAMMemory(newApp)
 
-    val JAVA_OPTS = ListBuffer[String]()
+    val javaOpts = ListBuffer[String]()
 
     // Add Xmx for AM memory
-    JAVA_OPTS += "-Xmx" + amMemory + "m"
+    javaOpts += "-Xmx" + amMemory + "m"
 
     val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
-    JAVA_OPTS += "-Djava.io.tmpdir=" + tmpDir
+    javaOpts += "-Djava.io.tmpdir=" + tmpDir
 
     // TODO: Remove once cpuset version is pushed out.
     // The context is, default gc for server class machines ends up using all cores to do gc -
@@ -325,11 +323,11 @@ trait ClientBase extends Logging {
     if (useConcurrentAndIncrementalGC) {
       // In our expts, using (default) throughput collector has severe perf ramifications in
       // multi-tenant machines
-      JAVA_OPTS += "-XX:+UseConcMarkSweepGC"
-      JAVA_OPTS += "-XX:+CMSIncrementalMode"
-      JAVA_OPTS += "-XX:+CMSIncrementalPacing"
-      JAVA_OPTS += "-XX:CMSIncrementalDutyCycleMin=0"
-      JAVA_OPTS += "-XX:CMSIncrementalDutyCycle=10"
+      javaOpts += "-XX:+UseConcMarkSweepGC"
+      javaOpts += "-XX:+CMSIncrementalMode"
+      javaOpts += "-XX:+CMSIncrementalPacing"
+      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
+      javaOpts += "-XX:CMSIncrementalDutyCycle=10"
     }
 
     // SPARK_JAVA_OPTS is deprecated, but for backwards compatibility:
@@ -344,22 +342,22 @@ trait ClientBase extends Logging {
       // If we are being launched in client mode, forward the spark-conf options
       // onto the executor launcher
       for ((k, v) <- sparkConf.getAll) {
-        JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+        javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
       }
     } else {
       // If we are being launched in standalone mode, capture and forward any spark
       // system properties (e.g. set by spark-class).
       for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
-        JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+        javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
       }
-      sys.props.get("spark.driver.extraJavaOptions").foreach(opts => JAVA_OPTS += opts)
-      sys.props.get("spark.driver.libraryPath").foreach(p => JAVA_OPTS += s"-Djava.library.path=$p")
+      sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts)
+      sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p")
     }
-    JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
+    javaOpts += ClientBase.getLog4jConfiguration(localResources)
 
     // Command for the ApplicationMaster
     val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
-      JAVA_OPTS ++
+      javaOpts ++
       Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar,
         userArgsToString(args),
         "--executor-memory", args.executorMemory.toString,
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 32f8861dc950378fe63aa375ae5845e227d60117..43dbb2464f9296fd0b686fe40f9ddcab7cd943ca 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
 import org.apache.spark.{Logging, SparkConf}
 
@@ -46,19 +46,19 @@ trait ExecutorRunnableUtil extends Logging {
       executorCores: Int,
       localResources: HashMap[String, LocalResource]): List[String] = {
     // Extra options for the JVM
-    val JAVA_OPTS = ListBuffer[String]()
+    val javaOpts = ListBuffer[String]()
     // Set the JVM memory
     val executorMemoryString = executorMemory + "m"
-    JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
+    javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
 
     // Set extra Java options for the executor, if defined
     sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
-      JAVA_OPTS += opts
+      javaOpts += opts
     }
 
-    JAVA_OPTS += "-Djava.io.tmpdir=" +
+    javaOpts += "-Djava.io.tmpdir=" +
       new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
-    JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
+    javaOpts += ClientBase.getLog4jConfiguration(localResources)
 
     // Certain configs need to be passed here because they are needed before the Executor
     // registers with the Scheduler and transfers the spark configs. Since the Executor backend
@@ -66,10 +66,10 @@ trait ExecutorRunnableUtil extends Logging {
     // authentication settings.
     sparkConf.getAll.
       filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
-      foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
+      foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
 
     sparkConf.getAkkaConf.
-      foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
+      foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
 
     // Commenting it out for now - so that people can refer to the properties if required. Remove
     // it once cpuset version is pushed out.
@@ -88,11 +88,11 @@ trait ExecutorRunnableUtil extends Logging {
           // multi-tennent machines
           // The options are based on
           // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
-          JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
-          JAVA_OPTS += " -XX:+CMSIncrementalMode "
-          JAVA_OPTS += " -XX:+CMSIncrementalPacing "
-          JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
-          JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+          javaOpts += " -XX:+UseConcMarkSweepGC "
+          javaOpts += " -XX:+CMSIncrementalMode "
+          javaOpts += " -XX:+CMSIncrementalPacing "
+          javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
+          javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
         }
     */
 
@@ -104,7 +104,7 @@ trait ExecutorRunnableUtil extends Logging {
       // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
       // 'something' to fail job ... akin to blacklisting trackers in mapred ?
       "-XX:OnOutOfMemoryError='kill %p'") ++
-      JAVA_OPTS ++
+      javaOpts ++
       Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
       masterAddress.toString,
       slaveId.toString,
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index a4638cc863611c0e152be72f47aed0c222aba5f8..39cdd2e8a522bd15b574740d4a488d53b0809134 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -33,10 +33,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
 
   def this(sc: SparkContext) = this(sc, new Configuration())
 
-  // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
-  // Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
-  // Subsequent creations are ignored - since nodes are already allocated by then.
-
+  // Nothing else for now ... initialize application master : which needs a SparkContext to
+  // determine how to allocate.
+  // Note that only the first creation of a SparkContext influences (and ideally, there must be
+  // only one SparkContext, right ?). Subsequent creations are ignored since executors are already
+  // allocated by then.
 
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
@@ -48,6 +49,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
   override def postStartHook() {
     val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
     if (sparkContextInitialized){
+      ApplicationMaster.waitForInitialAllocations()
       // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
       Thread.sleep(3000L)
     }
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 33a60d978c586acfc2eae159ec93f4395aacbb30..6244332f237374b236bda070e4209b3ff4dc6c47 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -19,13 +19,12 @@ package org.apache.spark.deploy.yarn
 
 import java.io.IOException
 import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+import java.util.concurrent.atomic.AtomicReference
 
 import scala.collection.JavaConversions._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.protocolrecords._
@@ -33,8 +32,7 @@ import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
@@ -77,17 +75,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     // than user specified and /tmp.
     System.setProperty("spark.local.dir", getLocalDirs())
 
-    // set the web ui port to be ephemeral for yarn so we don't conflict with
+    // Set the web ui port to be ephemeral for yarn so we don't conflict with
     // other spark processes running on the same box
     System.setProperty("spark.ui.port", "0")
 
-    // when running the AM, the Spark master is always "yarn-cluster"
+    // When running the AM, the Spark master is always "yarn-cluster"
     System.setProperty("spark.master", "yarn-cluster")
 
-    // Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using.
+    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
     ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
 
-    appAttemptId = getApplicationAttemptId()
+    appAttemptId = ApplicationMaster.getApplicationAttemptId()
+    logInfo("ApplicationAttemptId: " + appAttemptId)
     isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
     amClient = AMRMClient.createAMRMClient()
     amClient.init(yarnConf)
@@ -99,7 +98,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     ApplicationMaster.register(this)
 
     // Call this to force generation of secret so it gets populated into the
-    // hadoop UGI. This has to happen before the startUserClass which does a
+    // Hadoop UGI. This has to happen before the startUserClass which does a
     // doAs in order for the credentials to be passed on to the executor containers.
     val securityMgr = new SecurityManager(sparkConf)
 
@@ -121,7 +120,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     // Allocate all containers
     allocateExecutors()
 
-    // Wait for the user class to Finish
+    // Launch thread that will heartbeat to the RM so it won't think the app has died.
+    launchReporterThread()
+
+    // Wait for the user class to finish
     userThread.join()
 
     System.exit(0)
@@ -141,7 +143,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
   }
 
-  /** Get the Yarn approved local directories. */
+  // Get the Yarn approved local directories.
   private def getLocalDirs(): String = {
     // Hadoop 0.23 and 2.x have different Environment variable names for the
     // local dirs, so lets check both. We assume one of the 2 is set.
@@ -150,18 +152,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       .orElse(Option(System.getenv("LOCAL_DIRS")))
  
     localDirs match {
-      case None => throw new Exception("Yarn Local dirs can't be empty")
+      case None => throw new Exception("Yarn local dirs can't be empty")
       case Some(l) => l
     }
-  } 
-
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    appAttemptId
   }
 
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
@@ -173,25 +166,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     logInfo("Starting the user JAR in a separate Thread")
     val mainMethod = Class.forName(
       args.userClass,
-      false /* initialize */ ,
+      false,
       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     val t = new Thread {
       override def run() {
-
-      var successed = false
+        var succeeded = false
         try {
           // Copy
-          var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+          val mainArgs = new Array[String](args.userArgs.size)
           args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
           mainMethod.invoke(null, mainArgs)
-          // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
-          // userThread will stop here unless it has uncaught exception thrown out
-          // It need shutdown hook to set SUCCEEDED
-          successed = true
+          // Some apps have "System.exit(0)" at the end.  The user thread will stop here unless
+          // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
+          succeeded = true
         } finally {
-          logDebug("finishing main")
+          logDebug("Finishing main")
           isLastAMRetry = true
-          if (successed) {
+          if (succeeded) {
             ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
           } else {
             ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
@@ -199,11 +190,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         }
       }
     }
+    t.setName("Driver")
     t.start()
     t
   }
 
-  // This need to happen before allocateExecutors()
+  // This needs to happen before allocateExecutors()
   private def waitForSparkContextInitialized() {
     logInfo("Waiting for Spark context initialization")
     try {
@@ -231,7 +223,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
             sparkContext.preferredNodeLocationData,
             sparkContext.getConf)
         } else {
-          logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d".
+          logWarning("Unable to retrieve SparkContext in spite of waiting for %d, maxNumTries = %d".
             format(numTries * waitTime, maxNumTries))
           this.yarnAllocator = YarnAllocationHandler.newAllocator(
             yarnConf,
@@ -242,48 +234,37 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         }
       }
     } finally {
-      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT :
-      // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks.
-      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+      // In case of exceptions, etc - ensure that the loop in
+      // ApplicationMaster#sparkContextInitialized() breaks.
+      ApplicationMaster.doneWithInitialAllocations()
     }
   }
 
   private def allocateExecutors() {
     try {
-      logInfo("Allocating " + args.numExecutors + " executors.")
-      // Wait until all containers have finished
+      logInfo("Requesting" + args.numExecutors + " executors.")
+      // Wait until all containers have launched
       yarnAllocator.addResourceRequests(args.numExecutors)
       yarnAllocator.allocateResources()
       // Exits the loop if the user thread exits.
+
+      var iters = 0
       while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
         checkNumExecutorsFailed()
         allocateMissingExecutor()
         yarnAllocator.allocateResources()
-        ApplicationMaster.incrementAllocatorLoop(1)
-        Thread.sleep(100)
+        if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) {
+          ApplicationMaster.doneWithInitialAllocations()
+        }
+        Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
+        iters += 1
       }
     } finally {
-      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
-      // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
-      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+      // In case of exceptions, etc - ensure that the loop in
+      // ApplicationMaster#sparkContextInitialized() breaks.
+      ApplicationMaster.doneWithInitialAllocations()
     }
     logInfo("All executors have launched.")
-
-    // Launch a progress reporter thread, else the app will get killed after expiration
-    // (def: 10mins) timeout.
-    if (userThread.isAlive) {
-      // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
-      val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
-      // we want to be reasonably responsive without causing too many requests to RM.
-      val schedulerInterval =
-        sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
-      // must be <= timeoutInterval / 2.
-      val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
-      launchReporterThread(interval)
-    }
   }
 
   private def allocateMissingExecutor() {
@@ -303,47 +284,35 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     }
   }
 
-  private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
+  private def launchReporterThread(): Thread = {
+    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+    // we want to be reasonably responsive without causing too many requests to RM.
+    val schedulerInterval =
+      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
+    // must be <= timeoutInterval / 2.
+    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
 
     val t = new Thread {
       override def run() {
         while (userThread.isAlive) {
           checkNumExecutorsFailed()
           allocateMissingExecutor()
-          sendProgress()
-          Thread.sleep(sleepTime)
+          logDebug("Sending progress")
+          yarnAllocator.allocateResources()
+          Thread.sleep(interval)
         }
       }
     }
     // Setting to daemon status, though this is usually not a good idea.
     t.setDaemon(true)
     t.start()
-    logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+    logInfo("Started progress reporter thread - heartbeat interval : " + interval)
     t
   }
 
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // Simulated with an allocate request with no nodes requested.
-    yarnAllocator.allocateResources()
-  }
-
-  /*
-  def printContainers(containers: List[Container]) = {
-    for (container <- containers) {
-      logInfo("Launching shell command on a new container."
-        + ", containerId=" + container.getId()
-        + ", containerNode=" + container.getNodeId().getHost()
-        + ":" + container.getNodeId().getPort()
-        + ", containerNodeURI=" + container.getNodeHttpAddress()
-        + ", containerState" + container.getState()
-        + ", containerResourceMemory"
-        + container.getResource().getMemory())
-    }
-  }
-  */
-
   def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
     synchronized {
       if (isFinished) {
@@ -351,7 +320,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       }
       isFinished = true
 
-      logInfo("finishApplicationMaster with " + status)
+      logInfo("Unregistering ApplicationMaster with " + status)
       if (registered) {
         val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
         amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl)
@@ -386,7 +355,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
     def run() {
       logInfo("AppMaster received a signal.")
-      // we need to clean up staging dir before HDFS is shut down
+      // We need to clean up staging dir before HDFS is shut down
       // make sure we don't delete it until this is the last AM
       if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
     }
@@ -401,21 +370,24 @@ object ApplicationMaster {
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
   private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
 
   private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
 
   val sparkContextRef: AtomicReference[SparkContext] =
-    new AtomicReference[SparkContext](null /* initialValue */)
+    new AtomicReference[SparkContext](null)
 
-  val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
+  // Variable used to notify the YarnClusterScheduler that it should stop waiting
+  // for the initial set of executors to be started and get on with its business.
+  val doneWithInitialAllocationsMonitor = new Object()
 
-  def incrementAllocatorLoop(by: Int) {
-    val count = yarnAllocatorLoop.getAndAdd(by)
-    if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
-      yarnAllocatorLoop.synchronized {
-        // to wake threads off wait ...
-        yarnAllocatorLoop.notifyAll()
-      }
+  @volatile var isDoneWithInitialAllocations = false
+
+  def doneWithInitialAllocations() {
+    isDoneWithInitialAllocations = true
+    doneWithInitialAllocationsMonitor.synchronized {
+      // to wake threads off wait ...
+      doneWithInitialAllocationsMonitor.notifyAll()
     }
   }
 
@@ -423,7 +395,10 @@ object ApplicationMaster {
     applicationMasters.add(master)
   }
 
-  // TODO(harvey): See whether this should be discarded - it isn't used anywhere atm...
+  /**
+   * Called from YarnClusterScheduler to notify the AM code that a SparkContext has been
+   * initialized in the user code.
+   */
   def sparkContextInitialized(sc: SparkContext): Boolean = {
     var modified = false
     sparkContextRef.synchronized {
@@ -431,7 +406,7 @@ object ApplicationMaster {
       sparkContextRef.notifyAll()
     }
 
-    // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
+    // Add a shutdown hook - as a best effort in case users do not call sc.stop or do
     // System.exit.
     // Should not really have to do this, but it helps YARN to evict resources earlier.
     // Not to mention, prevent the Client from declaring failure even though we exited properly.
@@ -454,13 +429,29 @@ object ApplicationMaster {
       })
     }
 
-    // Wait for initialization to complete and atleast 'some' nodes can get allocated.
-    yarnAllocatorLoop.synchronized {
-      while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
-        yarnAllocatorLoop.wait(1000L)
+    // Wait for initialization to complete and at least 'some' nodes to get allocated.
+    modified
+  }
+
+  /**
+   * Returns when we've either
+   *  1) received all the requested executors,
+   *  2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
+   *  3) hit an error that causes us to terminate trying to get containers.
+   */
+  def waitForInitialAllocations() {
+    doneWithInitialAllocationsMonitor.synchronized {
+      while (!isDoneWithInitialAllocations) {
+        doneWithInitialAllocationsMonitor.wait(1000L)
       }
     }
-    modified
+  }
+
+  def getApplicationAttemptId(): ApplicationAttemptId = {
+    val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+    val containerId = ConverterUtils.toContainerId(containerIdString)
+    val appAttemptId = containerId.getApplicationAttemptId()
+    appAttemptId
   }
 
   def main(argStrings: Array[String]) {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 393edd1f2d670ac1e7fb66c69aa24f8bf09b7424..24027618c1f350dc100bb78cd8809dc505836d70 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,14 +21,12 @@ import java.nio.ByteBuffer
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SparkConf}
 
@@ -102,7 +100,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
 
   def logClusterResourceDetails() {
     val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics
-    logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: " +
+    logInfo("Got Cluster metric info from ResourceManager, number of NodeManagers: " +
       clusterMetrics.getNumNodeManagers)
 
     val queueInfo: QueueInfo = yarnClient.getQueueInfo(args.amQueue)
@@ -133,7 +131,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
 
   def submitApp(appContext: ApplicationSubmissionContext) = {
     // Submit the application to the applications manager.
-    logInfo("Submitting application to ASM")
+    logInfo("Submitting application to ResourceManager")
     yarnClient.submitApplication(appContext)
   }
 
@@ -149,7 +147,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
       Thread.sleep(interval)
       val report = yarnClient.getApplicationReport(appId)
 
-      logInfo("Application report from ASM: \n" +
+      logInfo("Application report from ResourceManager: \n" +
         "\t application identifier: " + appId.toString() + "\n" +
         "\t appId: " + appId.getId() + "\n" +
         "\t clientToAMToken: " + report.getClientToAMToken() + "\n" +
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index d93e5bb0225d5e1b15622bbbb769e7e33f32cd4e..4f8854a09a1e5e1c7e16e8503e2e6edaedbae950 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -72,8 +72,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     override def preStart() {
       logInfo("Listen to driver: " + driverUrl)
       driver = context.actorSelection(driverUrl)
-      // Send a hello message thus the connection is actually established,
-      // thus we can monitor Lifecycle Events.
+      // Send a hello message to establish the connection, after which
+      // we can monitor Lifecycle Events.
       driver ! "Hello"
       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
     }
@@ -95,7 +95,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     amClient.init(yarnConf)
     amClient.start()
 
-    appAttemptId = getApplicationAttemptId()
+    appAttemptId = ApplicationMaster.getApplicationAttemptId()
     registerApplicationMaster()
 
     waitForSparkMaster()
@@ -141,18 +141,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     }
   } 
 
-  private def getApplicationAttemptId(): ApplicationAttemptId = {
-    val envs = System.getenv()
-    val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    logInfo("ApplicationAttemptId: " + appAttemptId)
-    appAttemptId
-  }
-
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
     logInfo("Registering the ApplicationMaster")
-    // TODO:(Raymond) Find out Spark UI address and fill in here?
+    // TODO: Find out client's Spark UI address and fill in here?
     amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
   }
 
@@ -185,8 +176,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
 
 
   private def allocateExecutors() {
-
-    // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
+    // TODO: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
     val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
       scala.collection.immutable.Map()
 
@@ -198,8 +188,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
       preferredNodeLocationData,
       sparkConf)
 
-    logInfo("Allocating " + args.numExecutors + " executors.")
-    // Wait until all containers have finished
+    logInfo("Requesting " + args.numExecutors + " executors.")
+    // Wait until all containers have launched
     yarnAllocator.addResourceRequests(args.numExecutors)
     yarnAllocator.allocateResources()
     while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
@@ -221,7 +211,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     }
   }
 
-  // TODO: We might want to extend this to allocate more containers in case they die !
   private def launchReporterThread(_sleepTime: Long): Thread = {
     val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
@@ -229,7 +218,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
       override def run() {
         while (!driverClosed) {
           allocateMissingExecutor()
-          sendProgress()
+          logDebug("Sending progress")
+          yarnAllocator.allocateResources()
           Thread.sleep(sleepTime)
         }
       }
@@ -241,20 +231,14 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     t
   }
 
-  private def sendProgress() {
-    logDebug("Sending progress")
-    // simulated with an allocate request with no nodes requested ...
-    yarnAllocator.allocateResources()
-  }
-
   def finishApplicationMaster(status: FinalApplicationStatus) {
-    logInfo("finish ApplicationMaster with " + status)
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
+    logInfo("Unregistering ApplicationMaster with " + status)
+    val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
+    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl)
   }
 
 }
 
-
 object ExecutorLauncher {
   def main(argStrings: Array[String]) {
     val args = new ApplicationMasterArguments(argStrings)