Skip to content
Snippets Groups Projects
Commit 5f419bf9 authored by Sandy Ryza's avatar Sandy Ryza Committed by Thomas Graves
Browse files

SPARK-1032. If Yarn app fails before registering, app master stays aroun...

...d long after

This reopens https://github.com/apache/incubator-spark/pull/648 against the new repo.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #28 from sryza/sandy-spark-1032 and squashes the following commits:

5953f50 [Sandy Ryza] SPARK-1032. If Yarn app fails before registering, app master stays around long after
parent edf8a56a
No related branches found
No related tags found
No related merge requests found
...@@ -65,6 +65,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ...@@ -65,6 +65,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3)) math.max(args.numWorkers * 2, 3))
private var registered = false
def run() { def run() {
// Setup the directories so things go to yarn approved directories rather // Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp. // then user specified and /tmp.
...@@ -110,7 +112,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ...@@ -110,7 +112,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
waitForSparkContextInitialized() waitForSparkContextInitialized()
// Do this after spark master is up and SparkContext is created so that we can register UI Url // Do this after spark master is up and SparkContext is created so that we can register UI Url
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() synchronized {
if (!isFinished) {
registerApplicationMaster()
registered = true
}
}
// Allocate all containers // Allocate all containers
allocateWorkers() allocateWorkers()
...@@ -208,7 +215,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ...@@ -208,7 +215,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
var count = 0 var count = 0
val waitTime = 10000L val waitTime = 10000L
val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10) val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) { while (ApplicationMaster.sparkContextRef.get() == null && count < numTries
&& !isFinished) {
logInfo("Waiting for spark context initialization ... " + count) logInfo("Waiting for spark context initialization ... " + count)
count = count + 1 count = count + 1
ApplicationMaster.sparkContextRef.wait(waitTime) ApplicationMaster.sparkContextRef.wait(waitTime)
...@@ -341,17 +349,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ...@@ -341,17 +349,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return return
} }
isFinished = true isFinished = true
logInfo("finishApplicationMaster with " + status)
if (registered) {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setDiagnostics(diagnostics)
// Set tracking url to empty since we don't have a history server.
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
}
} }
logInfo("finishApplicationMaster with " + status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setDiagnostics(diagnostics)
// Set tracking url to empty since we don't have a history server.
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
} }
/** /**
......
...@@ -67,6 +67,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ...@@ -67,6 +67,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures", private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3)) math.max(args.numWorkers * 2, 3))
private var registered = false
def run() { def run() {
// Setup the directories so things go to YARN approved directories rather // Setup the directories so things go to YARN approved directories rather
// than user specified and /tmp. // than user specified and /tmp.
...@@ -99,7 +101,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ...@@ -99,7 +101,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
waitForSparkContextInitialized() waitForSparkContextInitialized()
// Do this after Spark master is up and SparkContext is created so that we can register UI Url. // Do this after Spark master is up and SparkContext is created so that we can register UI Url.
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() synchronized {
if (!isFinished) {
registerApplicationMaster()
registered = true
}
}
// Allocate all containers // Allocate all containers
allocateWorkers() allocateWorkers()
...@@ -180,7 +187,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ...@@ -180,7 +187,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
var numTries = 0 var numTries = 0
val waitTime = 10000L val waitTime = 10000L
val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) { while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries
&& !isFinished) {
logInfo("Waiting for Spark context initialization ... " + numTries) logInfo("Waiting for Spark context initialization ... " + numTries)
numTries = numTries + 1 numTries = numTries + 1
ApplicationMaster.sparkContextRef.wait(waitTime) ApplicationMaster.sparkContextRef.wait(waitTime)
...@@ -313,11 +321,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, ...@@ -313,11 +321,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return return
} }
isFinished = true isFinished = true
}
logInfo("finishApplicationMaster with " + status) logInfo("finishApplicationMaster with " + status)
// Set tracking URL to empty since we don't have a history server. if (registered) {
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */) // Set tracking URL to empty since we don't have a history server.
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
}
}
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment