diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9f085eef46720dc3a3c4eaa60c547c5670c43eae..33500d967ebb1ebc80305dcd2ce5a34051a5fbb0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalRegisteredExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected executors) 
+  // Submit tasks only after (registered resources / total expected resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
-  if (minRegisteredRatio > 1) minRegisteredRatio = 1
-  // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
+  var minRegisteredRatio =
+    math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
+  // Submit tasks after maxRegisteredWaitingTime milliseconds
+  // if minRegisteredRatio has not yet been reached  
   val maxRegisteredWaitingTime =
-    conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
+    conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
   val createTime = System.currentTimeMillis()
-  var ready = if (minRegisteredRatio <= 0) true else false
 
   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
     private val executorActor = new HashMap[String, ActorRef]
@@ -94,12 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
           executorAddress(executorId) = sender.path.address
           addressToExecutorId(sender.path.address) = executorId
           totalCoreCount.addAndGet(cores)
-          if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
-            ready = true
-            logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
-              executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
-              ", minRegisteredExecutorsRatio: " + minRegisteredRatio)
-          }
+          totalRegisteredExecutors.addAndGet(1)
           makeOffers()
         }
 
@@ -268,14 +263,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     }
   }
 
+  def sufficientResourcesRegistered(): Boolean = true
+
   override def isReady(): Boolean = {
-    if (ready) {
+    if (sufficientResourcesRegistered) {
+      logInfo("SchedulerBackend is ready for scheduling beginning after " +
+        s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
       return true
     }
     if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
-      ready = true
       logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
-        "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
+        s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
       return true
     }
     false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a28446f6c8a6bf3c956c50763774bf3f0d44a761..589dba2e40d208365e517e8c181d88865e856f23 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
   var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
 
   val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
+  val totalExpectedCores = maxCores.getOrElse(0)
 
   override def start() {
     super.start()
@@ -97,7 +98,6 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
     memory: Int) {
-    totalExpectedExecutors.addAndGet(1)
     logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
       fullId, hostPort, cores, Utils.megabytesToString(memory)))
   }
@@ -110,4 +110,8 @@ private[spark] class SparkDeploySchedulerBackend(
     logInfo("Executor %s removed: %s".format(fullId, message))
     removeExecutor(fullId.split("/")(1), reason.toString)
   }
+
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
+  }
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index 4d27c5a918fe09c9bb0584bbe0512daff31d316d..617a72a021f6e939fa81ca2b30b492585e317022 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -825,21 +825,22 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 </tr>
-  <td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
+  <td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
   <td>0</td>
   <td>
-    The minimum ratio of registered executors (registered executors / total expected executors)
+    The minimum ratio of registered resources (registered resources / total expected resources)
+    (resources are executors in yarn mode, CPU cores in standalone mode)
     to wait for before scheduling begins. Specified as a double between 0 and 1.
-    Regardless of whether the minimum ratio of executors has been reached,
+    Regardless of whether the minimum ratio of resources has been reached,
     the maximum amount of time it will wait before scheduling begins is controlled by config 
-    <code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code> 
+    <code>spark.scheduler.maxRegisteredResourcesWaitingTime</code> 
   </td>
 </tr>
 <tr>
-  <td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
+  <td><code>spark.scheduler.maxRegisteredResourcesWaitingTime</code></td>
   <td>30000</td>
   <td>
-    Maximum amount of time to wait for executors to register before scheduling begins
+    Maximum amount of time to wait for resources to register before scheduling begins
     (in milliseconds).  
   </td>
 </tr>
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index f8fb96b312f2344f8e03d496cb806eeb7ee58d5e..833e249f9f612645400966abf9d5a98e4e50a1c9 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -30,15 +30,15 @@ private[spark] class YarnClientSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
   with Logging {
 
-  if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
     minRegisteredRatio = 0.8
-    ready = false
   }
 
   var client: Client = null
   var appId: ApplicationId = null
   var checkerThread: Thread = null
   var stopping: Boolean = false
+  var totalExpectedExecutors = 0
 
   private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
       arrayBuf: ArrayBuffer[String]) {
@@ -84,7 +84,7 @@ private[spark] class YarnClientSchedulerBackend(
 
     logDebug("ClientArguments called with: " + argsArrayBuf)
     val args = new ClientArguments(argsArrayBuf.toArray, conf)
-    totalExpectedExecutors.set(args.numExecutors)
+    totalExpectedExecutors = args.numExecutors
     client = new Client(args, conf)
     appId = client.runApp()
     waitForApp()
@@ -150,4 +150,7 @@ private[spark] class YarnClientSchedulerBackend(
     logInfo("Stopped")
   }
 
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
+  }
 }
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 0ad1794d19538612f078583da9d91d02bf8afd1f..55665220a6f96a278406398a2776c9773df867d2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -27,19 +27,24 @@ private[spark] class YarnClusterSchedulerBackend(
     sc: SparkContext)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
 
-  if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+  var totalExpectedExecutors = 0
+  
+  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
     minRegisteredRatio = 0.8
-    ready = false
   }
 
   override def start() {
     super.start()
-    var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+    totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
     if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
-      numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
+      totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
+        .getOrElse(totalExpectedExecutors)
     }
     // System property can override environment variable.
-    numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
-    totalExpectedExecutors.set(numExecutors)
+    totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
+  }
+
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
   }
 }