Skip to content
Snippets Groups Projects
Commit c7877d5e authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #689 from BlackNiuza/application_status

Bug fix: SPARK-796
parents 10c05937 00556a94
No related branches found
No related tags found
No related merge requests found
......@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
def run() {
......@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Wait for the user class to Finish
userThread.join()
// Finish the ApplicationMaster
finishApplicationMaster()
// TODO: Exit based on success/failure
System.exit(0)
}
......@@ -124,17 +122,30 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
}
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
.getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
// Copy
var mainArgs: Array[String] = new Array[String](args.userArgs.size())
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
mainMethod.invoke(null, mainArgs)
var successed = false
try {
// Copy
var mainArgs: Array[String] = new Array[String](args.userArgs.size())
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
mainMethod.invoke(null, mainArgs)
// some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
// userThread will stop here unless it has uncaught exception thrown out
// It need shutdown hook to set SUCCEEDED
successed = true
} finally {
if (successed) {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} else {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
}
}
}
}
t.start()
......@@ -179,7 +190,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("All workers have launched.")
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
if (userThread.isAlive){
if (userThread.isAlive) {
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
......@@ -197,7 +208,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
val t = new Thread {
override def run() {
while (userThread.isAlive){
while (userThread.isAlive) {
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
......@@ -235,14 +246,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
*/
def finishApplicationMaster() {
def finishApplicationMaster(status: FinalApplicationStatus) {
synchronized {
if (isFinished) {
return
}
isFinished = true
}
logInfo("finishApplicationMaster with " + status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
// TODO: Check if the application has failed or succeeded
finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED)
finishReq.setFinishApplicationStatus(status)
resourceManager.finishApplicationMaster(finishReq)
}
}
......@@ -256,7 +276,7 @@ object ApplicationMaster {
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
if (count >= ALLOCATOR_LOOP_WAIT_COUNT){
if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.synchronized {
// to wake threads off wait ...
yarnAllocatorLoop.notifyAll()
......@@ -291,14 +311,16 @@ object ApplicationMaster {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
// best case ...
for (master <- applicationMasters) master.finishApplicationMaster
for (master <- applicationMasters) {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
}
} )
}
// Wait for initialization to complete and atleast 'some' nodes can get allocated
yarnAllocatorLoop.synchronized {
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT){
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment