diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 18a1c52ae53fb0c0a75ebcb9747971c3471d1e5c..915ef81b4eae384dd867a27a15c0d5bdd187bc91 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -176,7 +176,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull packagesExclusions = Option(packagesExclusions) .orElse(sparkProperties.get("spark.jars.excludes")).orNull - deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull + deployMode = Option(deployMode) + .orElse(sparkProperties.get("spark.submit.deployMode")) + .orElse(env.get("DEPLOY_MODE")) + .orNull numExecutors = Option(numExecutors) .getOrElse(sparkProperties.get("spark.executor.instances").orNull) keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index d494b0caab85f2e2028a5bce4653831595def48c..2626f5a16dfb8137375dd37bc09ddf0559e443a2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -136,6 +136,47 @@ class SparkSubmitSuite appArgs.childArgs should be (Seq("--master", "local", "some", "--weird", "args")) } + test("specify deploy mode through configuration") { + val clArgs = Seq( + "--master", "yarn", + "--conf", "spark.submit.deployMode=client", + "--class", "org.SomeClass", + "thejar.jar" + ) + val appArgs = new SparkSubmitArguments(clArgs) + val (_, _, sysProps, _) = prepareSubmitEnvironment(appArgs) + + appArgs.deployMode should be ("client") + sysProps("spark.submit.deployMode") should be ("client") + + // Both cmd line and configuration are specified, cmdline option takes the priority + val clArgs1 = Seq( + "--master", "yarn", + "--deploy-mode", "cluster", + "--conf", "spark.submit.deployMode=client", + "-class", "org.SomeClass", + "thejar.jar" + ) + val appArgs1 = new SparkSubmitArguments(clArgs1) + val (_, _, sysProps1, _) = prepareSubmitEnvironment(appArgs1) + + appArgs1.deployMode should be ("cluster") + sysProps1("spark.submit.deployMode") should be ("cluster") + + // Neither cmdline nor configuration are specified, client mode is the default choice + val clArgs2 = Seq( + "--master", "yarn", + "--class", "org.SomeClass", + "thejar.jar" + ) + val appArgs2 = new SparkSubmitArguments(clArgs2) + appArgs2.deployMode should be (null) + + val (_, _, sysProps2, _) = prepareSubmitEnvironment(appArgs2) + appArgs2.deployMode should be ("client") + sysProps2("spark.submit.deployMode") should be ("client") + } + test("handles YARN cluster mode") { val clArgs = Seq( "--deploy-mode", "cluster", diff --git a/docs/configuration.md b/docs/configuration.md index 55cf4b2dac5f5ae640f8699a48e6c3e829137bde..38d3d059f9d315bafc8021191c9547f9a7b2a6b1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -48,7 +48,7 @@ The following format is accepted: 1y (years) -Properties that specify a byte size should be configured with a unit of size. +Properties that specify a byte size should be configured with a unit of size. The following format is accepted: 1b (bytes) @@ -192,6 +192,15 @@ of the most common options to set are: <a href="submitting-applications.html#master-urls"> allowed master URL's</a>. </td> </tr> +<tr> + <td><code>spark.submit.deployMode</code></td> + <td>(none)</td> + <td> + The deploy mode of Spark driver program, either "client" or "cluster", + Which means to launch driver program locally ("client") + or remotely ("cluster") on one of the nodes inside the cluster. + </td> +</tr> </table> Apart from these, the following properties are also available, and may be useful in some situations: @@ -1095,7 +1104,7 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.rpc.lookupTimeout</code></td> <td>120s</td> <td> - Duration for an RPC remote endpoint lookup operation to wait before timing out. + Duration for an RPC remote endpoint lookup operation to wait before timing out. </td> </tr> </table> @@ -1559,7 +1568,7 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.streaming.stopGracefullyOnShutdown</code></td> <td>false</td> <td> - If <code>true</code>, Spark shuts down the <code>StreamingContext</code> gracefully on JVM + If <code>true</code>, Spark shuts down the <code>StreamingContext</code> gracefully on JVM shutdown rather than immediately. </td> </tr> diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index dd1c93af6ca4ce4eebeafaed9a13309473dea6b6..20e6003a00c19493a8c153f9513f0e3b6a15bdd2 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -40,6 +40,9 @@ public class SparkLauncher { /** The Spark master. */ public static final String SPARK_MASTER = "spark.master"; + /** The Spark deploy mode. */ + public static final String DEPLOY_MODE = "spark.submit.deployMode"; + /** Configuration key for the driver memory. */ public static final String DRIVER_MEMORY = "spark.driver.memory"; /** Configuration key for the driver class path. */ diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 312df0b269f32543b6e594d80ebc73013df25e3e..a95f0f17517d16146335fdae6fd221b37233d7b8 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -294,10 +294,11 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { private boolean isClientMode(Map<String, String> userProps) { String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER)); - // Default master is "local[*]", so assume client mode in that case. + String userDeployMode = firstNonEmpty(deployMode, userProps.get(SparkLauncher.DEPLOY_MODE)); + // Default master is "local[*]", so assume client mode in that case return userMaster == null || - "client".equals(deployMode) || - (!userMaster.equals("yarn-cluster") && deployMode == null); + "client".equals(userDeployMode) || + (!userMaster.equals("yarn-cluster") && userDeployMode == null); } /**