Skip to content
Snippets Groups Projects
Commit 3dd8af7a authored by li-zhihui's avatar li-zhihui Committed by Thomas Graves
Browse files

[SPARK-1946] Submit tasks after (configured ratio) executors have been registered

Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality.

A simple solution is sleeping few seconds in application, so that executors have enough time to register.

The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered.

\# Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0
spark.scheduler.minRegisteredExecutorsRatio = 0.8

\# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 30000
spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000

Author: li-zhihui <zhihui.li@intel.com>

Closes #900 from li-zhihui/master and squashes the following commits:

b9f8326 [li-zhihui] Add logs & edit docs
1ac08b1 [li-zhihui] Add new configs to user docs
22ead12 [li-zhihui] Move waitBackendReady to postStartHook
c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS
4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start & some code refactor
0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks
4261454 [li-zhihui] Add docs for new configs & code style
ce0868a [li-zhihui] Code style, rename configuration property name of minRegisteredRatio & maxRegisteredWaitingTime
6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha
812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode
e7b6272 [li-zhihui] support yarn-cluster
37f7dc2 [li-zhihui] support yarn mode(percentage style)
3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have been registered
parent d60b09bb
No related branches found
No related tags found
No related merge requests found
Showing
with 127 additions and 2 deletions
...@@ -1531,7 +1531,16 @@ object SparkContext extends Logging { ...@@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
throw new SparkException("YARN mode not available ?", e) throw new SparkException("YARN mode not available ?", e)
} }
} }
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) val backend = try {
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend) scheduler.initialize(backend)
scheduler scheduler
......
...@@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend { ...@@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend {
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException throw new UnsupportedOperationException
def isReady(): Boolean = true
} }
...@@ -145,6 +145,10 @@ private[spark] class TaskSchedulerImpl( ...@@ -145,6 +145,10 @@ private[spark] class TaskSchedulerImpl(
} }
} }
override def postStartHook() {
waitBackendReady()
}
override def submitTasks(taskSet: TaskSet) { override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
...@@ -437,6 +441,17 @@ private[spark] class TaskSchedulerImpl( ...@@ -437,6 +441,17 @@ private[spark] class TaskSchedulerImpl(
// By default, rack is unknown // By default, rack is unknown
def getRackForHost(value: String): Option[String] = None def getRackForHost(value: String): Option[String] = None
private def waitBackendReady(): Unit = {
if (backend.isReady) {
return
}
while (!backend.isReady) {
synchronized {
this.wait(100)
}
}
}
} }
......
...@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A ...@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{ {
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0) var totalCoreCount = new AtomicInteger(0)
var totalExpectedExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf) private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
// Submit tasks only after (registered executors / total expected executors)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
if (minRegisteredRatio > 1) minRegisteredRatio = 1
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
val createTime = System.currentTimeMillis()
var ready = if (minRegisteredRatio <= 0) true else false
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef] private val executorActor = new HashMap[String, ActorRef]
...@@ -83,6 +93,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A ...@@ -83,6 +93,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
executorAddress(executorId) = sender.path.address executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores) totalCoreCount.addAndGet(cores)
if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
ready = true
logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
", minRegisteredExecutorsRatio: " + minRegisteredRatio)
}
makeOffers() makeOffers()
} }
...@@ -247,6 +263,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A ...@@ -247,6 +263,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
throw new SparkException("Error notifying standalone scheduler's driver actor", e) throw new SparkException("Error notifying standalone scheduler's driver actor", e)
} }
} }
override def isReady(): Boolean = {
if (ready) {
return true
}
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
ready = true
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
return true
}
false
}
} }
private[spark] object CoarseGrainedSchedulerBackend { private[spark] object CoarseGrainedSchedulerBackend {
......
...@@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend( ...@@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
memory: Int) { memory: Int) {
totalExpectedExecutors.addAndGet(1)
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory))) fullId, hostPort, cores, Utils.megabytesToString(memory)))
} }
......
...@@ -699,6 +699,25 @@ Apart from these, the following properties are also available, and may be useful ...@@ -699,6 +699,25 @@ Apart from these, the following properties are also available, and may be useful
(in milliseconds) (in milliseconds)
</td> </td>
</tr> </tr>
</tr>
<td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
<td>0</td>
<td>
The minimum ratio of registered executors (registered executors / total expected executors)
to wait for before scheduling begins. Specified as a double between 0 and 1.
Regardless of whether the minimum ratio of executors has been reached,
the maximum amount of time it will wait before scheduling begins is controlled by config
<code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code>
</td>
</tr>
<tr>
<td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
<td>30000</td>
<td>
Maximum amount of time to wait for executors to register before scheduling begins
(in milliseconds).
</td>
</tr>
</table> </table>
#### Security #### Security
......
...@@ -184,6 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ...@@ -184,6 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private def startUserClass(): Thread = { private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread") logInfo("Starting the user JAR in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
val mainMethod = Class.forName( val mainMethod = Class.forName(
args.userClass, args.userClass,
false /* initialize */ , false /* initialize */ ,
......
...@@ -26,7 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) { ...@@ -26,7 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userArgs: Seq[String] = Seq[String]() var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024 var executorMemory = 1024
var executorCores = 1 var executorCores = 1
var numExecutors = 2 var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
parseArgs(args.toList) parseArgs(args.toList)
...@@ -93,3 +93,7 @@ class ApplicationMasterArguments(val args: Array[String]) { ...@@ -93,3 +93,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
System.exit(exitCode) System.exit(exitCode)
} }
} }
object ApplicationMasterArguments {
val DEFAULT_NUMBER_EXECUTORS = 2
}
...@@ -40,8 +40,10 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur ...@@ -40,8 +40,10 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
override def postStartHook() { override def postStartHook() {
super.postStartHook()
// The yarn application is running, but the executor might not yet ready // The yarn application is running, but the executor might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
// TODO It needn't after waitBackendReady
Thread.sleep(2000L) Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done") logInfo("YarnClientClusterScheduler.postStartHook done")
} }
......
...@@ -75,6 +75,7 @@ private[spark] class YarnClientSchedulerBackend( ...@@ -75,6 +75,7 @@ private[spark] class YarnClientSchedulerBackend(
logDebug("ClientArguments called with: " + argsArrayBuf) logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf) val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors.set(args.numExecutors)
client = new Client(args, conf) client = new Client(args, conf)
appId = client.runApp() appId = client.runApp()
waitForApp() waitForApp()
......
...@@ -48,9 +48,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) ...@@ -48,9 +48,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
override def postStartHook() { override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc) val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
if (sparkContextInitialized){ if (sparkContextInitialized){
ApplicationMaster.waitForInitialAllocations() ApplicationMaster.waitForInitialAllocations()
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
// TODO It needn't after waitBackendReady
Thread.sleep(3000L) Thread.sleep(3000L)
} }
logInfo("YarnClusterScheduler.postStartHook done") logInfo("YarnClusterScheduler.postStartHook done")
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import org.apache.spark.SparkContext
import org.apache.spark.deploy.yarn.ApplicationMasterArguments
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.IntParam
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
override def start() {
super.start()
var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
}
// System property can override environment variable.
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
totalExpectedExecutors.set(numExecutors)
}
}
...@@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ...@@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private def startUserClass(): Thread = { private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread") logInfo("Starting the user JAR in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
val mainMethod = Class.forName( val mainMethod = Class.forName(
args.userClass, args.userClass,
false, false,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment