Skip to content
Snippets Groups Projects
Commit f95ac686 authored by John Zhao's avatar John Zhao Committed by Xiangrui Meng
Browse files

[SPARK-1516]Throw exception in yarn client instead of run system.exit directly.

All the changes is in  the package of "org.apache.spark.deploy.yarn":
    1) Throw exception in ClinetArguments and ClientBase instead of exit directly.
    2) in Client's main method, if exception is caught, it will exit with code 1, otherwise exit with code 0.

After the fix, if user integrate the spark yarn client into their applications, when the argument is wrong or the running is finished, the application won't be terminated.

Author: John Zhao <jzhao@alpinenow.com>

Closes #490 from codeboyyong/jira_1516_systemexit_inyarnclient and squashes the following commits:

138cb48 [John Zhao] [SPARK-1516]Throw exception in yarn clinet instead of run system.exit directly. All the changes is in  the package of "org.apache.spark.deploy.yarn": 1) Add a ClientException with an exitCode 2) Throws exception in ClinetArguments and ClientBase instead of exit directly 3) in Client's main method, catch exception and exit with the exitCode.
parent 44daec5a
No related branches found
No related tags found
No related merge requests found
...@@ -85,7 +85,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa ...@@ -85,7 +85,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def run() { def run() {
val appId = runApp() val appId = runApp()
monitorApplication(appId) monitorApplication(appId)
System.exit(0)
} }
def logClusterResourceDetails() { def logClusterResourceDetails() {
...@@ -179,8 +178,17 @@ object Client { ...@@ -179,8 +178,17 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true") System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf val sparkConf = new SparkConf
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
case e: Exception => {
Console.err.println(e.getMessage)
System.exit(1)
}
}
System.exit(0)
} }
} }
...@@ -125,11 +125,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { ...@@ -125,11 +125,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
case Nil => case Nil =>
if (userClass == null) { if (userClass == null) {
printUsageAndExit(1) throw new IllegalArgumentException(getUsageMessage())
} }
case _ => case _ =>
printUsageAndExit(1, args) throw new IllegalArgumentException(getUsageMessage(args))
} }
} }
...@@ -138,11 +138,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { ...@@ -138,11 +138,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
} }
def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { def getUsageMessage(unknownParam: Any = null): String = {
if (unknownParam != null) { val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
System.err.println("Unknown/unsupported param " + unknownParam)
} message +
System.err.println(
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" + "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" + "Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" + " --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" +
...@@ -158,8 +157,5 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { ...@@ -158,8 +157,5 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
" --files files Comma separated list of files to be distributed with the job.\n" + " --files files Comma separated list of files to be distributed with the job.\n" +
" --archives archives Comma separated list of archives to be distributed with the job." " --archives archives Comma separated list of archives to be distributed with the job."
)
System.exit(exitCode)
} }
} }
...@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ ...@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.{SparkException, Logging, SparkConf, SparkContext}
/** /**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
...@@ -79,7 +79,7 @@ trait ClientBase extends Logging { ...@@ -79,7 +79,7 @@ trait ClientBase extends Logging {
).foreach { case(cond, errStr) => ).foreach { case(cond, errStr) =>
if (cond) { if (cond) {
logError(errStr) logError(errStr)
args.printUsageAndExit(1) throw new IllegalArgumentException(args.getUsageMessage())
} }
} }
} }
...@@ -94,15 +94,20 @@ trait ClientBase extends Logging { ...@@ -94,15 +94,20 @@ trait ClientBase extends Logging {
// If we have requested more then the clusters max for a single resource then exit. // If we have requested more then the clusters max for a single resource then exit.
if (args.executorMemory > maxMem) { if (args.executorMemory > maxMem) {
logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.". val errorMessage =
format(args.executorMemory, maxMem)) "Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster."
System.exit(1) .format(args.executorMemory, maxMem)
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
} }
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
if (amMem > maxMem) { if (amMem > maxMem) {
logError("Required AM memory (%d) is above the max threshold (%d) of this cluster".
format(args.amMemory, maxMem)) val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."
System.exit(1) .format(args.amMemory, maxMem)
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
} }
// We could add checks to make sure the entire cluster has enough resources but that involves // We could add checks to make sure the entire cluster has enough resources but that involves
...@@ -186,8 +191,9 @@ trait ClientBase extends Logging { ...@@ -186,8 +191,9 @@ trait ClientBase extends Logging {
val delegTokenRenewer = Master.getMasterPrincipal(conf) val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
logError("Can't get Master Kerberos principal for use as renewer") val errorMessage = "Can't get Master Kerberos principal for use as renewer"
System.exit(1) logError(errorMessage)
throw new SparkException(errorMessage)
} }
} }
val dst = new Path(fs.getHomeDirectory(), appStagingDir) val dst = new Path(fs.getHomeDirectory(), appStagingDir)
......
...@@ -95,7 +95,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa ...@@ -95,7 +95,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def run() { def run() {
val appId = runApp() val appId = runApp()
monitorApplication(appId) monitorApplication(appId)
System.exit(0)
} }
def logClusterResourceDetails() { def logClusterResourceDetails() {
...@@ -186,9 +185,18 @@ object Client { ...@@ -186,9 +185,18 @@ object Client {
// see Client#setupLaunchEnv(). // see Client#setupLaunchEnv().
System.setProperty("SPARK_YARN_MODE", "true") System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf() val sparkConf = new SparkConf()
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run() try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
case e: Exception => {
Console.err.println(e.getMessage)
System.exit(1)
}
}
System.exit(0)
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment