diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8226207de42b8bb21e1d1ba8398293d94ef80387..4ccddc214c8add19126d89d8794d412736ef89c8 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -85,7 +85,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def run() { val appId = runApp() monitorApplication(appId) - System.exit(0) } def logClusterResourceDetails() { @@ -179,8 +178,17 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - val args = new ClientArguments(argStrings, sparkConf) - new Client(args, sparkConf).run + try { + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() + } catch { + case e: Exception => { + Console.err.println(e.getMessage) + System.exit(1) + } + } + + System.exit(0) } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index b2c413b6d267ca1b9d1eeb37d0d78b80aaef7b30..fd3ef9e1fa2de318d320ccb3585c238fd3a24475 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -125,11 +125,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { case Nil => if (userClass == null) { - printUsageAndExit(1) + throw new IllegalArgumentException(getUsageMessage()) } case _ => - printUsageAndExit(1, args) + throw new IllegalArgumentException(getUsageMessage(args)) } } @@ -138,11 +138,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { } - def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { - if (unknownParam != null) { - System.err.println("Unknown/unsupported param " + unknownParam) - } - System.err.println( + def getUsageMessage(unknownParam: Any = null): String = { + val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" + + message + "Usage: org.apache.spark.deploy.yarn.Client [options] \n" + "Options:\n" + " --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" + @@ -158,8 +157,5 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + " --files files Comma separated list of files to be distributed with the job.\n" + " --archives archives Comma separated list of archives to be distributed with the job." - ) - System.exit(exitCode) } - } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 29a35680c0e72a246610bb7b3b04307d8836e541..6861b503000cad018f792407fcbe588a33f0c93f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records -import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.{SparkException, Logging, SparkConf, SparkContext} /** * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The @@ -79,7 +79,7 @@ trait ClientBase extends Logging { ).foreach { case(cond, errStr) => if (cond) { logError(errStr) - args.printUsageAndExit(1) + throw new IllegalArgumentException(args.getUsageMessage()) } } } @@ -94,15 +94,20 @@ trait ClientBase extends Logging { // If we have requested more then the clusters max for a single resource then exit. if (args.executorMemory > maxMem) { - logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.". - format(args.executorMemory, maxMem)) - System.exit(1) + val errorMessage = + "Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster." + .format(args.executorMemory, maxMem) + + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD if (amMem > maxMem) { - logError("Required AM memory (%d) is above the max threshold (%d) of this cluster". - format(args.amMemory, maxMem)) - System.exit(1) + + val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster." + .format(args.amMemory, maxMem) + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) } // We could add checks to make sure the entire cluster has enough resources but that involves @@ -186,8 +191,9 @@ trait ClientBase extends Logging { val delegTokenRenewer = Master.getMasterPrincipal(conf) if (UserGroupInformation.isSecurityEnabled()) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - logError("Can't get Master Kerberos principal for use as renewer") - System.exit(1) + val errorMessage = "Can't get Master Kerberos principal for use as renewer" + logError(errorMessage) + throw new SparkException(errorMessage) } } val dst = new Path(fs.getHomeDirectory(), appStagingDir) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 24027618c1f350dc100bb78cd8809dc505836d70..80a8bceb17269221821b009fc3fa372fa7c618aa 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -95,7 +95,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def run() { val appId = runApp() monitorApplication(appId) - System.exit(0) } def logClusterResourceDetails() { @@ -186,9 +185,18 @@ object Client { // see Client#setupLaunchEnv(). System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf() - val args = new ClientArguments(argStrings, sparkConf) - new Client(args, sparkConf).run() + try { + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() + } catch { + case e: Exception => { + Console.err.println(e.getMessage) + System.exit(1) + } + } + + System.exit(0) } }