diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 062f946a9fe93d57d499f7871a5eb6c17ada3d1b..3ec36487dcd2678cbba456f14dcf26a904f7ff5b 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -255,10 +255,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
             sparkContext.getConf)
         }
       }
-    } finally {
-      // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
-      // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
-      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
     }
   }
 
@@ -277,13 +273,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         }
         yarnAllocator.allocateContainers(
           math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
-        ApplicationMaster.incrementAllocatorLoop(1)
         Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
       }
-    } finally {
-      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
-      // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
-      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
     }
     logInfo("All executors have launched.")
 
@@ -411,24 +402,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 }
 
 object ApplicationMaster extends Logging {
-  // Number of times to wait for the allocator loop to complete.
-  // Each loop iteration waits for 100ms, so maximum of 3 seconds.
-  // This is to ensure that we have reasonable number of containers before we start
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
-  private val ALLOCATOR_LOOP_WAIT_COUNT = 30
   private val ALLOCATE_HEARTBEAT_INTERVAL = 100
 
-  def incrementAllocatorLoop(by: Int) {
-    val count = yarnAllocatorLoop.getAndAdd(by)
-    if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
-      yarnAllocatorLoop.synchronized {
-        // to wake threads off wait ...
-        yarnAllocatorLoop.notifyAll()
-      }
-    }
-  }
-
   private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
 
   def register(master: ApplicationMaster) {
@@ -437,7 +414,6 @@ object ApplicationMaster extends Logging {
 
   val sparkContextRef: AtomicReference[SparkContext] =
     new AtomicReference[SparkContext](null /* initialValue */)
-  val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
 
   def sparkContextInitialized(sc: SparkContext): Boolean = {
     var modified = false
@@ -472,21 +448,6 @@ object ApplicationMaster extends Logging {
     modified
   }
 
-
-  /**
-   * Returns when we've either
-   *  1) received all the requested executors,
-   *  2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
-   *  3) hit an error that causes us to terminate trying to get containers.
-   */
-  def waitForInitialAllocations() {
-    yarnAllocatorLoop.synchronized {
-      while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
-        yarnAllocatorLoop.wait(1000L)
-      }
-    }
-  }
-
   def main(argStrings: Array[String]) {
     SignalLogger.register(log)
     val args = new ApplicationMasterArguments(argStrings)
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 15e8c21aa5906c8e0e9444a633e9a746dfd1216a..3474112ded5d71343bd6a55ff42d00bbfa7e76e6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -37,14 +37,4 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
     val retval = YarnAllocationHandler.lookupRack(conf, host)
     if (retval != null) Some(retval) else None
   }
-
-  override def postStartHook() {
-
-    super.postStartHook()
-    // The yarn application is running, but the executor might not yet ready
-    // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
-    // TODO It needn't after waitBackendReady
-    Thread.sleep(2000L)
-    logInfo("YarnClientClusterScheduler.postStartHook done")
-  }
 }
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 1b37c4bb13f4993790b6059e763ada5acb8160b4..d8266f7b0c9a711dca497370186bd719d904b513 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -30,6 +30,11 @@ private[spark] class YarnClientSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
   with Logging {
 
+  if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+    minRegisteredRatio = 0.8
+    ready = false
+  }
+
   var client: Client = null
   var appId: ApplicationId = null
 
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 9ee53d797c8eaa628bb507da62d2ec07c1e613ad..9aeca4a637d383296a1c84b0001bd212174aeca0 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -47,14 +47,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
   }
 
   override def postStartHook() {
-    val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
+    ApplicationMaster.sparkContextInitialized(sc)
     super.postStartHook()
-    if (sparkContextInitialized){
-      ApplicationMaster.waitForInitialAllocations()
-      // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
-      // TODO It needn't after waitBackendReady
-      Thread.sleep(3000L)
-    }
     logInfo("YarnClusterScheduler.postStartHook done")
   }
 }
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index a04b08f43cc5a8488c8ff9af0e6cd555491d9c98..0ad1794d19538612f078583da9d91d02bf8afd1f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -27,6 +27,11 @@ private[spark] class YarnClusterSchedulerBackend(
     sc: SparkContext)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
 
+  if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+    minRegisteredRatio = 0.8
+    ready = false
+  }
+
   override def start() {
     super.start()
     var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1a24ec759b54686030ee4cc0eb5cda7d23e55f90..eaf594c8b49b98a77fc11d41baef92f778dfbb62 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -234,10 +234,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
             sparkContext.getConf)
         }
       }
-    } finally {
-      // In case of exceptions, etc - ensure that the loop in
-      // ApplicationMaster#sparkContextInitialized() breaks.
-      ApplicationMaster.doneWithInitialAllocations()
     }
   }
 
@@ -254,16 +250,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         checkNumExecutorsFailed()
         allocateMissingExecutor()
         yarnAllocator.allocateResources()
-        if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) {
-          ApplicationMaster.doneWithInitialAllocations()
-        }
         Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
         iters += 1
       }
-    } finally {
-      // In case of exceptions, etc - ensure that the loop in
-      // ApplicationMaster#sparkContextInitialized() breaks.
-      ApplicationMaster.doneWithInitialAllocations()
     }
     logInfo("All executors have launched.")
   }
@@ -365,12 +354,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 }
 
 object ApplicationMaster extends Logging {
-  // Number of times to wait for the allocator loop to complete.
-  // Each loop iteration waits for 100ms, so maximum of 3 seconds.
-  // This is to ensure that we have reasonable number of containers before we start
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
-  private val ALLOCATOR_LOOP_WAIT_COUNT = 30
   private val ALLOCATE_HEARTBEAT_INTERVAL = 100
 
   private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
@@ -378,20 +363,6 @@ object ApplicationMaster extends Logging {
   val sparkContextRef: AtomicReference[SparkContext] =
     new AtomicReference[SparkContext](null)
 
-  // Variable used to notify the YarnClusterScheduler that it should stop waiting
-  // for the initial set of executors to be started and get on with its business.
-  val doneWithInitialAllocationsMonitor = new Object()
-
-  @volatile var isDoneWithInitialAllocations = false
-
-  def doneWithInitialAllocations() {
-    isDoneWithInitialAllocations = true
-    doneWithInitialAllocationsMonitor.synchronized {
-      // to wake threads off wait ...
-      doneWithInitialAllocationsMonitor.notifyAll()
-    }
-  }
-
   def register(master: ApplicationMaster) {
     applicationMasters.add(master)
   }
@@ -434,20 +405,6 @@ object ApplicationMaster extends Logging {
     modified
   }
 
-  /**
-   * Returns when we've either
-   *  1) received all the requested executors,
-   *  2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
-   *  3) hit an error that causes us to terminate trying to get containers.
-   */
-  def waitForInitialAllocations() {
-    doneWithInitialAllocationsMonitor.synchronized {
-      while (!isDoneWithInitialAllocations) {
-        doneWithInitialAllocationsMonitor.wait(1000L)
-      }
-    }
-  }
-
   def getApplicationAttemptId(): ApplicationAttemptId = {
     val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
     val containerId = ConverterUtils.toContainerId(containerIdString)