diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b050dccb6d57fbe9cbb645244b85d34f4219cc08..3d8373d8175eeabe0de22be4ea2197b88b809420 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -27,25 +27,39 @@ import org.apache.spark.executor.ExecutorURLClassLoader
 import org.apache.spark.util.Utils
 
 /**
- * Scala code behind the spark-submit script.  The script handles setting up the classpath with
- * relevant Spark dependencies and provides a layer over the different cluster managers and deploy
- * modes that Spark supports.
+ * Main gateway of launching a Spark application.
+ *
+ * This program handles setting up the classpath with relevant Spark dependencies and provides
+ * a layer over the different cluster managers and deploy modes that Spark supports.
  */
 object SparkSubmit {
+
+  // Cluster managers
   private val YARN = 1
   private val STANDALONE = 2
   private val MESOS = 4
   private val LOCAL = 8
   private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
 
-  private var clusterManager: Int = LOCAL
+  // Deploy modes
+  private val CLIENT = 1
+  private val CLUSTER = 2
+  private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
 
-  /**
-   * Special primary resource names that represent shells rather than application jars.
-   */
+  // Special primary resource names that represent shells rather than application jars.
   private val SPARK_SHELL = "spark-shell"
   private val PYSPARK_SHELL = "pyspark-shell"
 
+  // Exposed for testing
+  private[spark] var exitFn: () => Unit = () => System.exit(-1)
+  private[spark] var printStream: PrintStream = System.err
+  private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
+  private[spark] def printErrorAndExit(str: String) = {
+    printStream.println("Error: " + str)
+    printStream.println("Run with --help for usage help or --verbose for debug output")
+    exitFn()
+  }
+
   def main(args: Array[String]) {
     val appArgs = new SparkSubmitArguments(args)
     if (appArgs.verbose) {
@@ -55,88 +69,80 @@ object SparkSubmit {
     launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
   }
 
-  // Exposed for testing
-  private[spark] var printStream: PrintStream = System.err
-  private[spark] var exitFn: () => Unit = () => System.exit(-1)
-
-  private[spark] def printErrorAndExit(str: String) = {
-    printStream.println("Error: " + str)
-    printStream.println("Run with --help for usage help or --verbose for debug output")
-    exitFn()
-  }
-  private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
-
   /**
-   * @return a tuple containing the arguments for the child, a list of classpath
-   *         entries for the child, a list of system properties, a list of env vars
-   *         and the main class for the child
+   * @return a tuple containing
+   *           (1) the arguments for the child process,
+   *           (2) a list of classpath entries for the child,
+   *           (3) a list of system properties and env vars, and
+   *           (4) the main class for the child
    */
   private[spark] def createLaunchEnv(args: SparkSubmitArguments)
       : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
-    if (args.master.startsWith("local")) {
-      clusterManager = LOCAL
-    } else if (args.master.startsWith("yarn")) {
-      clusterManager = YARN
-    } else if (args.master.startsWith("spark")) {
-      clusterManager = STANDALONE
-    } else if (args.master.startsWith("mesos")) {
-      clusterManager = MESOS
-    } else {
-      printErrorAndExit("Master must start with yarn, mesos, spark, or local")
-    }
-
-    // Because "yarn-cluster" and "yarn-client" encapsulate both the master
-    // and deploy mode, we have some logic to infer the master and deploy mode
-    // from each other if only one is specified, or exit early if they are at odds.
-    if (args.deployMode == null &&
-        (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
-      args.deployMode = "cluster"
-    }
-    if (args.deployMode == "cluster" && args.master == "yarn-client") {
-      printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
-    }
-    if (args.deployMode == "client" &&
-        (args.master == "yarn-standalone" || args.master == "yarn-cluster")) {
-      printErrorAndExit("Deploy mode \"client\" and master \"" + args.master
-        + "\" are not compatible")
-    }
-    if (args.deployMode == "cluster" && args.master.startsWith("yarn")) {
-      args.master = "yarn-cluster"
-    }
-    if (args.deployMode != "cluster" && args.master.startsWith("yarn")) {
-      args.master = "yarn-client"
-    }
-
-    val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster"
 
-    val childClasspath = new ArrayBuffer[String]()
+    // Values to return
     val childArgs = new ArrayBuffer[String]()
+    val childClasspath = new ArrayBuffer[String]()
     val sysProps = new HashMap[String, String]()
     var childMainClass = ""
 
-    val isPython = args.isPython
-    val isYarnCluster = clusterManager == YARN && deployOnCluster
+    // Set the cluster manager
+    val clusterManager: Int = args.master match {
+      case m if m.startsWith("yarn") => YARN
+      case m if m.startsWith("spark") => STANDALONE
+      case m if m.startsWith("mesos") => MESOS
+      case m if m.startsWith("local") => LOCAL
+      case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1
+    }
 
-    // For mesos, only client mode is supported
-    if (clusterManager == MESOS && deployOnCluster) {
-      printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
+    // Set the deploy mode; default is client mode
+    var deployMode: Int = args.deployMode match {
+      case "client" | null => CLIENT
+      case "cluster" => CLUSTER
+      case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
     }
 
-    // For standalone, only client mode is supported
-    if (clusterManager == STANDALONE && deployOnCluster) {
-      printErrorAndExit("Cluster deploy mode is currently not supported for standalone clusters.")
+    // Because "yarn-cluster" and "yarn-client" encapsulate both the master
+    // and deploy mode, we have some logic to infer the master and deploy mode
+    // from each other if only one is specified, or exit early if they are at odds.
+    if (clusterManager == YARN) {
+      if (args.master == "yarn-standalone") {
+        printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.")
+        args.master = "yarn-cluster"
+      }
+      (args.master, args.deployMode) match {
+        case ("yarn-cluster", null) =>
+          deployMode = CLUSTER
+        case ("yarn-cluster", "client") =>
+          printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
+        case ("yarn-client", "cluster") =>
+          printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
+        case (_, mode) =>
+          args.master = "yarn-" + Option(mode).getOrElse("client")
+      }
+
+      // Make sure YARN is included in our build if we're trying to use it
+      if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
+        printErrorAndExit(
+          "Could not load YARN classes. " +
+          "This copy of Spark may not have been compiled with YARN support.")
+      }
     }
 
-    // For shells, only client mode is applicable
-    if (isShell(args.primaryResource) && deployOnCluster) {
-      printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
+    // The following modes are not supported or applicable
+    (clusterManager, deployMode) match {
+      case (MESOS, CLUSTER) =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
+      case (STANDALONE, CLUSTER) =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.")
+      case (_, CLUSTER) if args.isPython =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
+      case (_, CLUSTER) if isShell(args.primaryResource) =>
+        printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
+      case _ =>
     }
 
     // If we're running a python app, set the main class to our specific python runner
-    if (isPython) {
-      if (deployOnCluster) {
-        printErrorAndExit("Cluster deploy mode is currently not supported for python.")
-      }
+    if (args.isPython) {
       if (args.primaryResource == PYSPARK_SHELL) {
         args.mainClass = "py4j.GatewayServer"
         args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
@@ -152,120 +158,115 @@ object SparkSubmit {
       sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
     }
 
-    // If we're deploying into YARN, use yarn.Client as a wrapper around the user class
-    if (!deployOnCluster) {
-      childMainClass = args.mainClass
-      if (isUserJar(args.primaryResource)) {
-        childClasspath += args.primaryResource
-      }
-    } else if (clusterManager == YARN) {
-      childMainClass = "org.apache.spark.deploy.yarn.Client"
-      childArgs += ("--jar", args.primaryResource)
-      childArgs += ("--class", args.mainClass)
-    }
-
-    // Make sure YARN is included in our build if we're trying to use it
-    if (clusterManager == YARN) {
-      if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
-        printErrorAndExit("Could not load YARN classes. " +
-          "This copy of Spark may not have been compiled with YARN support.")
-      }
-    }
-
     // Special flag to avoid deprecation warnings at the client
     sysProps("SPARK_SUBMIT") = "true"
 
     // A list of rules to map each argument to system properties or command-line options in
     // each deploy mode; we iterate through these below
     val options = List[OptionAssigner](
-      OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
-      OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
-      OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
-      OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
+
+      // All cluster managers
+      OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"),
+      OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"),
+      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
+
+      // Standalone cluster only
+      OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
+      OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
+
+      // Yarn client only
+      OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
+      OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
+      OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"),
+      OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
+      OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
+
+      // Yarn cluster only
+      OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
+      OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
+      OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
+      OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
+      OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
+      OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
+      OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
+      OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
+      OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
+
+      // Other options
+      OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
         sysProp = "spark.driver.extraClassPath"),
-      OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
+      OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
         sysProp = "spark.driver.extraJavaOptions"),
-      OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
+      OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
         sysProp = "spark.driver.extraLibraryPath"),
-      OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
-      OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
-      OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
-      OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
-      OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
-      OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
-      OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
-      OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
-      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
+      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT,
         sysProp = "spark.executor.memory"),
-      OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
-      OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
-      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
+      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT,
         sysProp = "spark.cores.max"),
-      OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
-      OptionAssigner(args.files, YARN, true, clOption = "--files"),
-      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
-      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
-      OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
-      OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
-      OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
-      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
+      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
+        sysProp = "spark.files")
     )
 
-    // For client mode make any added jars immediately visible on the classpath
-    if (args.jars != null && !deployOnCluster) {
-      for (jar <- args.jars.split(",")) {
-        childClasspath += jar
+    // In client mode, launch the application main class directly
+    // In addition, add the main application jar and any added jars (if any) to the classpath
+    if (deployMode == CLIENT) {
+      childMainClass = args.mainClass
+      if (isUserJar(args.primaryResource)) {
+        childClasspath += args.primaryResource
       }
+      if (args.jars != null) { childClasspath ++= args.jars.split(",") }
+      if (args.childArgs != null) { childArgs ++= args.childArgs }
     }
 
+
     // Map all arguments to command-line options or system properties for our chosen mode
     for (opt <- options) {
-      if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
+      if (opt.value != null &&
+          (deployMode & opt.deployMode) != 0 &&
           (clusterManager & opt.clusterManager) != 0) {
-        if (opt.clOption != null) {
-          childArgs += (opt.clOption, opt.value)
-        }
-        if (opt.sysProp != null) {
-          sysProps.put(opt.sysProp, opt.value)
-        }
+        if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
+        if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
       }
     }
 
     // Add the application jar automatically so the user doesn't have to call sc.addJar
     // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
     // For python files, the primary resource is already distributed as a regular file
-    if (!isYarnCluster && !isPython) {
-      var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
+    val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
+    if (!isYarnCluster && !args.isPython) {
+      var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
       if (isUserJar(args.primaryResource)) {
         jars = jars ++ Seq(args.primaryResource)
       }
       sysProps.put("spark.jars", jars.mkString(","))
     }
 
-    // Standalone cluster specific configurations
-    if (deployOnCluster && clusterManager == STANDALONE) {
+    // In standalone-cluster mode, use Client as a wrapper around the user class
+    if (clusterManager == STANDALONE && deployMode == CLUSTER) {
+      childMainClass = "org.apache.spark.deploy.Client"
       if (args.supervise) {
         childArgs += "--supervise"
       }
-      childMainClass = "org.apache.spark.deploy.Client"
       childArgs += "launch"
       childArgs += (args.master, args.primaryResource, args.mainClass)
+      if (args.childArgs != null) {
+        childArgs ++= args.childArgs
+      }
     }
 
-    // Arguments to be passed to user program
-    if (args.childArgs != null) {
-      if (!deployOnCluster || clusterManager == STANDALONE) {
-        childArgs ++= args.childArgs
-      } else if (clusterManager == YARN) {
-        for (arg <- args.childArgs) {
-          childArgs += ("--arg", arg)
-        }
+    // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
+    if (clusterManager == YARN && deployMode == CLUSTER) {
+      childMainClass = "org.apache.spark.deploy.yarn.Client"
+      childArgs += ("--jar", args.primaryResource)
+      childArgs += ("--class", args.mainClass)
+      if (args.childArgs != null) {
+        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
       }
     }
 
     // Read from default spark properties, if any
     for ((k, v) <- args.getDefaultSparkProperties) {
-      if (!sysProps.contains(k)) sysProps(k) = v
+      sysProps.getOrElseUpdate(k, v)
     }
 
     (childArgs, childClasspath, sysProps, childMainClass)
@@ -364,6 +365,6 @@ object SparkSubmit {
 private[spark] case class OptionAssigner(
     value: String,
     clusterManager: Int,
-    deployOnCluster: Boolean,
+    deployMode: Int,
     clOption: String = null,
     sysProp: String = null)