diff --git a/bin/spark-class b/bin/spark-class index 3f6beca5becf015537d605f93552d36f9797267d..22acf92288b3b879b41f4c9bc31f010c151e4d33 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -17,6 +17,8 @@ # limitations under the License. # +# NOTE: Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala! + cygwin=false case "`uname`" in CYGWIN*) cygwin=true;; @@ -39,7 +41,7 @@ fi if [ -n "$SPARK_MEM" ]; then echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2 - echo -e "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." 1>&2 + echo -e "(e.g., spark.executor.memory or spark.driver.memory)." 1>&2 fi # Use SPARK_MEM or 512m as the default memory, to be overridden by specific options @@ -73,11 +75,17 @@ case "$1" in OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM} ;; - # Spark submit uses SPARK_SUBMIT_OPTS and SPARK_JAVA_OPTS - 'org.apache.spark.deploy.SparkSubmit') - OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS \ - -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH" + # Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS + + # SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY. + 'org.apache.spark.deploy.SparkSubmit') + OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS" OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM} + if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then + OUR_JAVA_OPTS="$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH" + fi + if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then + OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY" + fi ;; *) @@ -101,11 +109,12 @@ fi # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS" JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM" + # Load extra JAVA_OPTS from conf/java-opts, if it exists if [ -e "$FWDIR/conf/java-opts" ] ; then JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" fi -export JAVA_OPTS + # Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala! TOOLS_DIR="$FWDIR"/tools @@ -146,10 +155,28 @@ if $cygwin; then fi export CLASSPATH -if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then - echo -n "Spark Command: " 1>&2 - echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 - echo -e "========================================\n" 1>&2 +# In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself. +# Here we must parse the properties file for relevant "spark.driver.*" configs before launching +# the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM +# to prepare the launch environment of this driver JVM. + +if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then + # This is used only if the properties file actually contains these special configs + # Export the environment variables needed by SparkSubmitDriverBootstrapper + export RUNNER + export CLASSPATH + export JAVA_OPTS + export OUR_JAVA_MEM + export SPARK_CLASS=1 + shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own + exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@" +else + # Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala + if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then + echo -n "Spark Command: " 1>&2 + echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 + echo -e "========================================\n" 1>&2 + fi + exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" fi -exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" diff --git a/bin/spark-submit b/bin/spark-submit index 9e7cecedd0325697a47316ea3a4a55463fa0407f..32c911cd0438b687a79e42d4798999290a924374 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -17,14 +17,18 @@ # limitations under the License. # +# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala! + export SPARK_HOME="$(cd `dirname $0`/..; pwd)" ORIG_ARGS=("$@") while (($#)); do if [ "$1" = "--deploy-mode" ]; then - DEPLOY_MODE=$2 + SPARK_SUBMIT_DEPLOY_MODE=$2 + elif [ "$1" = "--properties-file" ]; then + SPARK_SUBMIT_PROPERTIES_FILE=$2 elif [ "$1" = "--driver-memory" ]; then - DRIVER_MEMORY=$2 + export SPARK_SUBMIT_DRIVER_MEMORY=$2 elif [ "$1" = "--driver-library-path" ]; then export SPARK_SUBMIT_LIBRARY_PATH=$2 elif [ "$1" = "--driver-class-path" ]; then @@ -35,10 +39,24 @@ while (($#)); do shift done -DEPLOY_MODE=${DEPLOY_MODE:-"client"} +DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf" +export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"} +export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"} + +# For client mode, the driver will be launched in the same JVM that launches +# SparkSubmit, so we may need to read the properties file for any extra class +# paths, library paths, java options and memory early on. Otherwise, it will +# be too late by the time the driver JVM has started. -if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then - export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY +if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then + # Parse the properties file only if the special configs exist + contains_special_configs=$( + grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \ + grep -v "^[[:space:]]*#" + ) + if [ -n "$contains_special_configs" ]; then + export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 + fi fi exec $SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}" diff --git a/bin/utils.sh b/bin/utils.sh old mode 100644 new mode 100755 diff --git a/conf/spark-defaults.conf.template b/conf/spark-defaults.conf.template index 2779342769c14300c2b444762f08c202dd4d04e0..94427029b94d7c6d50dd309337449cc3a9ebd9e7 100644 --- a/conf/spark-defaults.conf.template +++ b/conf/spark-defaults.conf.template @@ -2,7 +2,9 @@ # This is useful for setting default environmental settings. # Example: -# spark.master spark://master:7077 -# spark.eventLog.enabled true -# spark.eventLog.dir hdfs://namenode:8021/directory -# spark.serializer org.apache.spark.serializer.KryoSerializer +# spark.master spark://master:7077 +# spark.eventLog.enabled true +# spark.eventLog.dir hdfs://namenode:8021/directory +# spark.serializer org.apache.spark.serializer.KryoSerializer +# spark.driver.memory 5g +# spark.executor.extraJavaOptions -XX:+PrintGCDetail -Dkey=value -Dnumbers="one two three" diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 52c70712eea3dc758bd281a3ee51451bac8394c1..be5ebfa9219d34a92f088205260e022eb3cc2ce7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -40,28 +40,3 @@ private[spark] object PythonUtils { paths.filter(_ != "").mkString(File.pathSeparator) } } - - -/** - * A utility class to redirect the child process's stdout or stderr. - */ -private[spark] class RedirectThread( - in: InputStream, - out: OutputStream, - name: String) - extends Thread(name) { - - setDaemon(true) - override def run() { - scala.util.control.Exception.ignoring(classOf[IOException]) { - // FIXME: We copy the stream on the level of bytes to avoid encoding problems. - val buf = new Array[Byte](1024) - var len = in.read(buf) - while (len != -1) { - out.write(buf, 0, len) - out.flush() - len = in.read(buf) - } - } - } -} diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index bf716a8ab025b516c8daa58d873dda1680ed0a53..4c4796f6c59bada6902846443cbdbc958fa6b934 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -17,7 +17,6 @@ package org.apache.spark.api.python -import java.lang.Runtime import java.io.{DataOutputStream, DataInputStream, InputStream, OutputStreamWriter} import java.net.{InetAddress, ServerSocket, Socket, SocketException} @@ -25,7 +24,7 @@ import scala.collection.mutable import scala.collection.JavaConversions._ import org.apache.spark._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{RedirectThread, Utils} private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) extends Logging { diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 0d6751f3fa6d2c452fa5188ffc575bdb05b955c6..b66c3ba4d5fb0020f2aed9561f11a8c3363f43cd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -22,8 +22,8 @@ import java.net.URI import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ -import org.apache.spark.api.python.{PythonUtils, RedirectThread} -import org.apache.spark.util.Utils +import org.apache.spark.api.python.PythonUtils +import org.apache.spark.util.{RedirectThread, Utils} /** * A main class used by spark-submit to launch Python applications. It executes python as a diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 318509a67a36fd31130759d09ea9698645772dc0..f8cdbc3c392b551382026cf9b4b77cbc29484ad3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -195,18 +195,21 @@ object SparkSubmit { OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), // Other options - OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER, - sysProp = "spark.driver.extraClassPath"), - OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER, - sysProp = "spark.driver.extraJavaOptions"), - OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER, - sysProp = "spark.driver.extraLibraryPath"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, - sysProp = "spark.files") + sysProp = "spark.files"), + + // Only process driver specific options for cluster mode here, + // because they have already been processed in bash for client mode + OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER, + sysProp = "spark.driver.extraClassPath"), + OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER, + sysProp = "spark.driver.extraJavaOptions"), + OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER, + sysProp = "spark.driver.extraLibraryPath") ) // In client mode, launch the application main class directly diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala new file mode 100644 index 0000000000000000000000000000000000000000..af607e6a4a065209421d41709b1970f1fdedde35 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.File + +import scala.collection.JavaConversions._ + +import org.apache.spark.util.{RedirectThread, Utils} + +/** + * Launch an application through Spark submit in client mode with the appropriate classpath, + * library paths, java options and memory. These properties of the JVM must be set before the + * driver JVM is launched. The sole purpose of this class is to avoid handling the complexity + * of parsing the properties file for such relevant configs in Bash. + * + * Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args> + */ +private[spark] object SparkSubmitDriverBootstrapper { + + // Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`. + // Any changes made there must be reflected in this file. + + def main(args: Array[String]): Unit = { + + // This should be called only from `bin/spark-class` + if (!sys.env.contains("SPARK_CLASS")) { + System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!") + System.exit(1) + } + + val submitArgs = args + val runner = sys.env("RUNNER") + val classpath = sys.env("CLASSPATH") + val javaOpts = sys.env("JAVA_OPTS") + val defaultDriverMemory = sys.env("OUR_JAVA_MEM") + + // Spark submit specific environment variables + val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE") + val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE") + val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER") + val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY") + val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH") + val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH") + val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS") + + assume(runner != null, "RUNNER must be set") + assume(classpath != null, "CLASSPATH must be set") + assume(javaOpts != null, "JAVA_OPTS must be set") + assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set") + assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!") + assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set") + assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set") + + // Parse the properties file for the equivalent spark.driver.* configs + val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap + val confDriverMemory = properties.get("spark.driver.memory") + val confLibraryPath = properties.get("spark.driver.extraLibraryPath") + val confClasspath = properties.get("spark.driver.extraClassPath") + val confJavaOpts = properties.get("spark.driver.extraJavaOptions") + + // Favor Spark submit arguments over the equivalent configs in the properties file. + // Note that we do not actually use the Spark submit values for library path, classpath, + // and Java opts here, because we have already captured them in Bash. + + val newDriverMemory = submitDriverMemory + .orElse(confDriverMemory) + .getOrElse(defaultDriverMemory) + + val newLibraryPath = + if (submitLibraryPath.isDefined) { + // SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS + "" + } else { + confLibraryPath.map("-Djava.library.path=" + _).getOrElse("") + } + + val newClasspath = + if (submitClasspath.isDefined) { + // SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH + classpath + } else { + classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("") + } + + val newJavaOpts = + if (submitJavaOpts.isDefined) { + // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS + javaOpts + } else { + javaOpts + confJavaOpts.map(" " + _).getOrElse("") + } + + val filteredJavaOpts = Utils.splitCommandString(newJavaOpts) + .filterNot(_.startsWith("-Xms")) + .filterNot(_.startsWith("-Xmx")) + + // Build up command + val command: Seq[String] = + Seq(runner) ++ + Seq("-cp", newClasspath) ++ + Seq(newLibraryPath) ++ + filteredJavaOpts ++ + Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++ + Seq("org.apache.spark.deploy.SparkSubmit") ++ + submitArgs + + // Print the launch command. This follows closely the format used in `bin/spark-class`. + if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) { + System.err.print("Spark Command: ") + System.err.println(command.mkString(" ")) + System.err.println("========================================\n") + } + + // Start the driver JVM + val filteredCommand = command.filter(_.nonEmpty) + val builder = new ProcessBuilder(filteredCommand) + val process = builder.start() + + // Redirect stdin, stdout, and stderr to/from the child JVM + val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") + val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout") + val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr") + stdinThread.start() + stdoutThread.start() + stderrThread.start() + + // Terminate on broken pipe, which signals that the parent process has exited. This is + // important for the PySpark shell, where Spark submit itself is a python subprocess. + stdinThread.join() + process.destroy() + } + +} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d6d74ce26921959d9310cd965e2fa2602ad43751..69a84a3604a5267b61279522dc2b51edd12ae626 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1480,3 +1480,24 @@ private[spark] object Utils extends Logging { } } + +/** + * A utility class to redirect the child process's stdout or stderr. + */ +private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String) + extends Thread(name) { + + setDaemon(true) + override def run() { + scala.util.control.Exception.ignoring(classOf[IOException]) { + // FIXME: We copy the stream on the level of bytes to avoid encoding problems. + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { + out.write(buf, 0, len) + out.flush() + len = in.read(buf) + } + } + } +}