Skip to content
Snippets Groups Projects
Commit 323a83c5 authored by GuoQiang Li's avatar GuoQiang Li Committed by Thomas Graves
Browse files

[SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executor.failures

Author: GuoQiang Li <witgo@qq.com>

Closes #1180 from witgo/SPARK-2037 and squashes the following commits:

3d52411 [GuoQiang Li] review commit
7058f4d [GuoQiang Li] Correctly stop SparkContext
6d0561f [GuoQiang Li] Fix: yarn client mode doesn't support spark.yarn.max.executor.failures
parent c960b505
No related branches found
No related tags found
No related merge requests found
......@@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
......@@ -57,10 +56,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false
private var driverClosed: Boolean = false
private var isFinished: Boolean = false
private var registered: Boolean = false
// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
val securityManager = new SecurityManager(sparkConf)
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityManager)._1
var actor: ActorRef = _
......@@ -97,23 +103,26 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Compute number of threads for akka
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
if (minimumMemory > 0) {
val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
// TODO: Uncomment when hadoop is on a version which has this fixed.
// args.workerCores = numCore
synchronized {
if (!isFinished) {
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Compute number of threads for akka
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
if (minimumMemory > 0) {
val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
// TODO: Uncomment when hadoop is on a version which has this fixed.
// args.workerCores = numCore
}
}
registered = true
}
}
waitForSparkMaster()
addAmIpFilter()
// Allocate all containers
......@@ -243,11 +252,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
checkNumExecutorsFailed()
Thread.sleep(100)
}
logInfo("All executors have launched.")
}
private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
}
// TODO: We might want to extend this to allocate more containers in case they die !
......@@ -257,6 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
checkNumExecutorsFailed()
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating " + missingExecutorCount +
......@@ -282,15 +298,23 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
yarnAllocator.allocateContainers(0)
}
def finishApplicationMaster(status: FinalApplicationStatus) {
logInfo("finish ApplicationMaster with " + status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", ""))
resourceManager.finishApplicationMaster(finishReq)
def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
synchronized {
if (isFinished) {
return
}
logInfo("Unregistering ApplicationMaster with " + status)
if (registered) {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
finishReq.setDiagnostics(appMessage)
resourceManager.finishApplicationMaster(finishReq)
}
isFinished = true
}
}
}
......
......@@ -37,6 +37,8 @@ private[spark] class YarnClientSchedulerBackend(
var client: Client = null
var appId: ApplicationId = null
var checkerThread: Thread = null
var stopping: Boolean = false
private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
arrayBuf: ArrayBuffer[String]) {
......@@ -86,6 +88,7 @@ private[spark] class YarnClientSchedulerBackend(
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
checkerThread = yarnApplicationStateCheckerThread()
}
def waitForApp() {
......@@ -116,7 +119,32 @@ private[spark] class YarnClientSchedulerBackend(
}
}
private def yarnApplicationStateCheckerThread(): Thread = {
val t = new Thread {
override def run() {
while (!stopping) {
val report = client.getApplicationReport(appId)
val state = report.getYarnApplicationState()
if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED
|| state == YarnApplicationState.FAILED) {
logError(s"Yarn application already ended: $state")
sc.stop()
stopping = true
}
Thread.sleep(1000L)
}
checkerThread = null
Thread.currentThread().interrupt()
}
}
t.setName("Yarn Application State Checker")
t.setDaemon(true)
t.start()
t
}
override def stop() {
stopping = true
super.stop()
client.stop
logInfo("Stopped")
......
......@@ -19,15 +19,12 @@ package org.apache.spark.deploy.yarn
import java.net.Socket
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
......@@ -57,10 +54,16 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false
private var driverClosed: Boolean = false
private var isFinished: Boolean = false
private var registered: Boolean = false
private var amClient: AMRMClient[ContainerRequest] = _
// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
val securityManager = new SecurityManager(sparkConf)
val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityManager)._1
......@@ -101,7 +104,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
amClient.start()
appAttemptId = ApplicationMaster.getApplicationAttemptId()
registerApplicationMaster()
synchronized {
if (!isFinished) {
registerApplicationMaster()
registered = true
}
}
waitForSparkMaster()
addAmIpFilter()
......@@ -210,6 +218,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
Thread.sleep(100)
......@@ -228,12 +237,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
}
private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
}
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
while (!driverClosed) {
checkNumExecutorsFailed()
allocateMissingExecutor()
logDebug("Sending progress")
yarnAllocator.allocateResources()
......@@ -248,10 +265,18 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
t
}
def finishApplicationMaster(status: FinalApplicationStatus) {
logInfo("Unregistering ApplicationMaster with " + status)
val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "")
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl)
def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
synchronized {
if (isFinished) {
return
}
logInfo("Unregistering ApplicationMaster with " + status)
if (registered) {
val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
amClient.unregisterApplicationMaster(status, appMessage, trackingUrl)
}
isFinished = true
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment