diff --git a/.rat-excludes b/.rat-excludes
index 9e9abb3f10bbf97af2530524100b85cad3780911..50766954ef07033fc62ffe8c56425c87a9a6f8aa 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -11,6 +11,7 @@ RELEASE
 control
 docs
 fairscheduler.xml.template
+spark-defaults.conf.template
 log4j.properties
 log4j.properties.template
 metrics.properties.template
diff --git a/bin/run-example b/bin/run-example
index 5af95a08c6c41f51eee55829c2b321017c4e73fd..b2999198a8d4161efae3b3bb98533cda7605171f 100755
--- a/bin/run-example
+++ b/bin/run-example
@@ -75,7 +75,6 @@ fi
 
 # Set JAVA_OPTS to be able to load native libraries and to set heap size
 JAVA_OPTS="$SPARK_JAVA_OPTS"
-JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
 # Load extra JAVA_OPTS from conf/java-opts, if it exists
 if [ -e "$FWDIR/conf/java-opts" ] ; then
   JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
diff --git a/bin/spark-class b/bin/spark-class
index 1b0d309cc5b1c0d37a24327cef441293d5862e8b..6871e180c9fa8ec4d2751d08545de7f19f62e151 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -98,7 +98,7 @@ fi
 
 # Set JAVA_OPTS to be able to load native libraries and to set heap size
 JAVA_OPTS="$OUR_JAVA_OPTS"
-JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
+JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$_SPARK_LIBRARY_PATH"
 JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
 # Load extra JAVA_OPTS from conf/java-opts, if it exists
 if [ -e "$FWDIR/conf/java-opts" ] ; then
diff --git a/bin/spark-submit b/bin/spark-submit
index d92d55a032bd5d9cf82cd622903ef84f0c350c0b..498d0b27bacdfcb6131bbe75614763dccc16d618 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
@@ -25,8 +25,13 @@ while (($#)); do
     DEPLOY_MODE=$2
   elif [ $1 = "--driver-memory" ]; then
     DRIVER_MEMORY=$2
+  elif [ $1 = "--driver-library-path" ]; then
+    export _SPARK_LIBRARY_PATH=$2
+  elif [ $1 = "--driver-class-path" ]; then
+    export SPARK_CLASSPATH="$SPARK_CLASSPATH:$2"
+  elif [ $1 = "--driver-java-options" ]; then
+    export SPARK_JAVA_OPTS="$SPARK_JAVA_OPTS $2"
   fi
-
   shift
 done
 
diff --git a/conf/spark-defaults.conf.template b/conf/spark-defaults.conf.template
new file mode 100644
index 0000000000000000000000000000000000000000..f840ff681d019e747e83cd5c18d7b1cf43db08d5
--- /dev/null
+++ b/conf/spark-defaults.conf.template
@@ -0,0 +1,7 @@
+# Default system properties included when running spark-submit.
+# This is useful for setting default environmental settings.
+
+# Example:
+# spark.master 	          spark://master:7077
+# spark.eventLog.enabled  true
+# spark.eventLog.dir      hdfs://namenode:8021/directory
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index 6432a566089bec04681bd307ee144a4383973616..177a21cc0377fda8f3fdc3b59de6decc2ea9afff 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -1,22 +1,41 @@
 #!/usr/bin/env bash
 
-# This file contains environment variables required to run Spark. Copy it as
-# spark-env.sh and edit that to configure Spark for your site.
-#
-# The following variables can be set in this file:
+# This file is sourced when running various Spark programs.
+# Copy it as spark-env.sh and edit that to configure Spark for your site.
+
+# Options read when launching programs locally with 
+# ./bin/run-example or ./bin/spark-submit
+# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
+# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
+# - SPARK_CLASSPATH, default classpath entries to append
+
+# Options read by executors and drivers running inside the cluster
 # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
+# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
+# - SPARK_CLASSPATH, default classpath entries to append
+# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
 # - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
-# - SPARK_JAVA_OPTS, to set node-specific JVM options for Spark. Note that
-#   we recommend setting app-wide options in the application's driver program.
-#     Examples of node-specific options : -Dspark.local.dir, GC options
-#     Examples of app-wide options : -Dspark.serializer
-#
-# If using the standalone deploy mode, you can also set variables for it here:
+
+# Options read in YARN client mode
+# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
+# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
+# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
+# - SPARK_DRIVER_MEMORY, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
+# - SPARK_YARN_APP_NAME, The name of your application (Default: Spark)
+# - SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests (Default: ‘default’)
+# - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job.
+# - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job.
+
+# Options for the daemons used in the standalone deploy mode:
 # - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
 # - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
+# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
 # - SPARK_WORKER_CORES, to set the number of cores to use on this machine
-# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
+# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
 # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
 # - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
 # - SPARK_WORKER_DIR, to set the working directory of worker processes
-# - SPARK_PUBLIC_DNS, to set the public dns name of the master
+# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
+# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
+# - SPARK_DAEMON_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
+# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b947feb891ee63157a1c36428721efa66b231b0d..bd21fdc5a18e49936b2a634c41668afc849bbaac 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -208,6 +208,82 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     new SparkConf(false).setAll(settings)
   }
 
+  /** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
+    * idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
+  private[spark] def validateSettings() {
+    if (settings.contains("spark.local.dir")) {
+      val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
+        "the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
+      logWarning(msg)
+    }
+
+    val executorOptsKey = "spark.executor.extraJavaOptions"
+    val executorClasspathKey = "spark.executor.extraClassPath"
+    val driverOptsKey = "spark.driver.extraJavaOptions"
+    val driverClassPathKey = "spark.driver.extraClassPath"
+
+    // Validate spark.executor.extraJavaOptions
+    settings.get(executorOptsKey).map { javaOpts =>
+      if (javaOpts.contains("-Dspark")) {
+        val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts)'. " +
+          "Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
+        throw new Exception(msg)
+      }
+      if (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")) {
+        val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). " +
+          "Use spark.executor.memory instead."
+        throw new Exception(msg)
+      }
+    }
+
+    // Check for legacy configs
+    sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
+      val error =
+        s"""
+          |SPARK_JAVA_OPTS was detected (set to '$value').
+          |This has undefined behavior when running on a cluster and is deprecated in Spark 1.0+.
+          |
+          |Please instead use:
+          | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
+          | - ./spark-submit with --driver-java-options to set -X options for a driver
+          | - spark.executor.extraJavaOptions to set -X options for executors
+          | - SPARK_DAEMON_OPTS to set java options for standalone daemons (i.e. master, worker)
+        """.stripMargin
+      logError(error)
+
+      for (key <- Seq(executorOptsKey, driverOptsKey)) {
+        if (getOption(key).isDefined) {
+          throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
+        } else {
+          logWarning(s"Setting '$key' to '$value' as a work-around.")
+          set(key, value)
+        }
+      }
+    }
+
+    sys.env.get("SPARK_CLASSPATH").foreach { value =>
+      val error =
+        s"""
+          |SPARK_CLASSPATH was detected (set to '$value').
+          | This has undefined behavior when running on a cluster and is deprecated in Spark 1.0+.
+          |
+          |Please instead use:
+          | - ./spark-submit with --driver-class-path to augment the driver classpath
+          | - spark.executor.extraClassPath to augment the executor classpath
+        """.stripMargin
+      logError(error)
+
+      for (key <- Seq(executorClasspathKey, driverClassPathKey)) {
+        if (getOption(key).isDefined) {
+          throw new SparkException(s"Found both $key and SPARK_CLASSPATH. Use only the former.")
+        } else {
+          logWarning(s"Setting '$key' to '$value' as a work-around.")
+          set(key, value)
+        }
+      }
+    }
+  }
+
   /**
    * Return a string listing all keys and values, one per line. This is useful to print the
    * configuration out for debugging.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d3ef75bc7335aa9841d27c9cd62b1d63817f1986..7933d68d67d9691f1533386f786965f177698cfc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -148,6 +148,7 @@ class SparkContext(config: SparkConf) extends Logging {
     this(master, appName, sparkHome, jars, Map(), Map())
 
   private[spark] val conf = config.clone()
+  conf.validateSettings()
 
   /**
    * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
@@ -159,7 +160,7 @@ class SparkContext(config: SparkConf) extends Logging {
     throw new SparkException("A master URL must be set in your configuration")
   }
   if (!conf.contains("spark.app.name")) {
-    throw new SparkException("An application must be set in your configuration")
+    throw new SparkException("An application name must be set in your configuration")
   }
 
   if (conf.getBoolean("spark.logConf", false)) {
@@ -170,11 +171,11 @@ class SparkContext(config: SparkConf) extends Logging {
   conf.setIfMissing("spark.driver.host", Utils.localHostName())
   conf.setIfMissing("spark.driver.port", "0")
 
-  val jars: Seq[String] = if (conf.contains("spark.jars")) {
-    conf.get("spark.jars").split(",").filter(_.size != 0)
-  } else {
-    null
-  }
+  val jars: Seq[String] =
+    conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
+
+  val files: Seq[String] =
+    conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
 
   val master = conf.get("spark.master")
   val appName = conf.get("spark.app.name")
@@ -235,6 +236,10 @@ class SparkContext(config: SparkConf) extends Logging {
     jars.foreach(addJar)
   }
 
+  if (files != null) {
+    files.foreach(addFile)
+  }
+
   private def warnSparkMem(value: String): String = {
     logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
       "deprecated, please use spark.executor.memory instead.")
@@ -247,22 +252,20 @@ class SparkContext(config: SparkConf) extends Logging {
     .map(Utils.memoryStringToMb)
     .getOrElse(512)
 
-  // Environment variables to pass to our executors
-  private[spark] val executorEnvs = HashMap[String, String]()
-  for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS");
-      value <- Option(System.getenv(key))) {
-    executorEnvs(key) = value
-  }
+  // Environment variables to pass to our executors.
+  // NOTE: This should only be used for test related settings.
+  private[spark] val testExecutorEnvs = HashMap[String, String]()
+
   // Convert java options to env vars as a work around
   // since we can't set env vars directly in sbt.
-  for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing"))
+  for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
     value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
-    executorEnvs(envKey) = value
+    testExecutorEnvs(envKey) = value
   }
   // The Mesos scheduler backend relies on this environment variable to set executor memory.
   // TODO: Set this only in the Mesos scheduler.
-  executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
-  executorEnvs ++= conf.getExecutorEnv
+  testExecutorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
+  testExecutorEnvs ++= conf.getExecutorEnv
 
   // Set SPARK_USER for user who is running SparkContext.
   val sparkUser = Option {
@@ -270,7 +273,7 @@ class SparkContext(config: SparkConf) extends Logging {
   }.getOrElse {
     SparkContext.SPARK_UNKNOWN_USER
   }
-  executorEnvs("SPARK_USER") = sparkUser
+  testExecutorEnvs("SPARK_USER") = sparkUser
 
   // Create and start the scheduler
   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 8fd2c7e95b966d09146cbc2503e83cc63efca05e..7ead1171525d282bd1fb3ff474a73e450facc482 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -54,8 +54,21 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
         System.getenv().foreach{case (k, v) => env(k) = v}
 
         val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
+
+        val classPathConf = "spark.driver.extraClassPath"
+        val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
+          cp.split(java.io.File.pathSeparator)
+        }
+
+        val libraryPathConf = "spark.driver.extraLibraryPath"
+        val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
+          cp.split(java.io.File.pathSeparator)
+        }
+
+        val javaOptionsConf = "spark.driver.extraJavaOptions"
+        val javaOpts = sys.props.get(javaOptionsConf)
         val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
-          driverArgs.driverOptions, env)
+          driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
 
         val driverDescription = new DriverDescription(
           driverArgs.jarUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala
index fa8af9a6467509a35f9c58d7b98782abf358aec9..32f3ba385084fa98e3cb233e4b268bf1d91ceecd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Command.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala
@@ -22,5 +22,8 @@ import scala.collection.Map
 private[spark] case class Command(
     mainClass: String,
     arguments: Seq[String],
-    environment: Map[String, String]) {
+    environment: Map[String, String],
+    classPathEntries: Seq[String],
+    libraryPathEntries: Seq[String],
+    extraJavaOptions: Option[String] = None) {
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index e5d593cade8b3c3ba40441cf4e71e6c36eceff36..1b1e0fce0e0dfc5578b47d71bca239f8d07fbc52 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.deploy
 
-import java.io.{PrintStream, File}
+import java.io.{File, PrintStream}
 import java.net.{URI, URL}
 
-import org.apache.spark.executor.ExecutorURLClassLoader
+import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
+import org.apache.spark.executor.ExecutorURLClassLoader
 
 /**
  * Scala code behind the spark-submit script.  The script handles setting up the classpath with
@@ -63,7 +61,8 @@ object SparkSubmit {
   /**
    * @return
    *         a tuple containing the arguments for the child, a list of classpath
-   *         entries for the child, and the main class for the child
+   *         entries for the child, a list of system propertes, a list of env vars
+   *         and the main class for the child
    */
   private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
       ArrayBuffer[String], Map[String, String], String) = {
@@ -123,6 +122,12 @@ object SparkSubmit {
 
     val options = List[OptionAssigner](
       new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
+      new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
+        sysProp = "spark.driver.extraClassPath"),
+      new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
+        sysProp = "spark.driver.extraJavaOptions"),
+      new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
+        sysProp = "spark.driver.extraLibraryPath"),
       new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
       new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
       new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
@@ -142,10 +147,14 @@ object SparkSubmit {
       new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
       new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
       new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
-      new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars")
+      new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"),
+      new OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
+      new OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
+      new OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false,
+        sysProp = "spark.app.name")
     )
 
-    // more jars
+    // For client mode make any added jars immediately visible on the classpath
     if (appArgs.jars != null && !deployOnCluster) {
       for (jar <- appArgs.jars.split(",")) {
         childClasspath += jar
@@ -163,6 +172,14 @@ object SparkSubmit {
       }
     }
 
+    // For standalone mode, add the application jar automatically so the user doesn't have to
+    // call sc.addJar. TODO: Standalone mode in the cluster
+    if (clusterManager == STANDALONE) {
+      val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
+      sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
+      println("SPARK JARS" + sysProps.get("spark.jars"))
+    }
+
     if (deployOnCluster && clusterManager == STANDALONE) {
       if (appArgs.supervise) {
         childArgs += "--supervise"
@@ -173,7 +190,7 @@ object SparkSubmit {
       childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass)
     }
 
-    // args
+    // Arguments to be passed to user program
     if (appArgs.childArgs != null) {
       if (!deployOnCluster || clusterManager == STANDALONE) {
         childArgs ++= appArgs.childArgs
@@ -184,6 +201,10 @@ object SparkSubmit {
       }
     }
 
+    for ((k, v) <- appArgs.getDefaultSparkProperties) {
+      if (!sysProps.contains(k)) sysProps(k) = v
+    }
+
     (childArgs, childClasspath, sysProps, childMainClass)
   }
 
@@ -191,11 +212,11 @@ object SparkSubmit {
       sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
 
     if (verbose) {
-      System.err.println(s"Main class:\n$childMainClass")
-      System.err.println(s"Arguments:\n${childArgs.mkString("\n")}")
-      System.err.println(s"System properties:\n${sysProps.mkString("\n")}")
-      System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
-      System.err.println("\n")
+      printStream.println(s"Main class:\n$childMainClass")
+      printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
+      printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
+      printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
+      printStream.println("\n")
     }
 
     val loader = new ExecutorURLClassLoader(new Array[URL](0),
@@ -226,6 +247,10 @@ object SparkSubmit {
   }
 }
 
+/**
+ * Provides an indirection layer for passing arguments as system properties or flags to
+ * the user's driver program or to downstream launcher tools.
+ */
 private[spark] class OptionAssigner(val value: String,
   val clusterManager: Int,
   val deployOnCluster: Boolean,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 834b3df2f164ba76aa2c1cbf607f7cbb37cbf843..02502adfbd0c4d16d5b568982d03574f73ea9224 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -17,18 +17,28 @@
 
 package org.apache.spark.deploy
 
-import scala.collection.mutable.ArrayBuffer
+import java.io.{File, FileInputStream, IOException}
+import java.util.Properties
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, ArrayBuffer}
+
+import org.apache.spark.SparkException
 
 /**
  * Parses and encapsulates arguments from the spark-submit script.
  */
 private[spark] class SparkSubmitArguments(args: Array[String]) {
-  var master: String = "local"
+  var master: String = null
   var deployMode: String = null
   var executorMemory: String = null
   var executorCores: String = null
   var totalExecutorCores: String = null
+  var propertiesFile: String = null
   var driverMemory: String = null
+  var driverExtraClassPath: String = null
+  var driverExtraLibraryPath: String = null
+  var driverExtraJavaOptions: String = null
   var driverCores: String = null
   var supervise: Boolean = false
   var queue: String = null
@@ -42,42 +52,102 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
   var jars: String = null
   var verbose: Boolean = false
 
-  loadEnvVars()
   parseOpts(args.toList)
+  loadDefaults()
+  checkRequiredArguments()
+
+  /** Return default present in the currently defined defaults file. */
+  def getDefaultSparkProperties = {
+    val defaultProperties = new HashMap[String, String]()
+    if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
+    Option(propertiesFile).foreach { filename =>
+      val file = new File(filename)
+      SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
+        if (k.startsWith("spark")) {
+          defaultProperties(k) = v
+          if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
+        }
+        else {
+          SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
+        }
+      }
+    }
+    defaultProperties
+  }
+
+  /** Fill in any undefined values based on the current properties file or built-in defaults. */
+  private def loadDefaults() = {
+
+    // Use common defaults file, if not specified by user
+    if (propertiesFile == null) {
+      sys.env.get("SPARK_HOME").foreach { sparkHome =>
+        val sep = File.separator
+        val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf"
+        val file = new File(defaultPath)
+        if (file.exists()) {
+          propertiesFile = file.getAbsolutePath
+        }
+      }
+    }
+
+    val defaultProperties = getDefaultSparkProperties
+    // Use properties file as fallback for values which have a direct analog to
+    // arguments in this script.
+    master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
+    executorMemory = Option(executorMemory)
+      .getOrElse(defaultProperties.get("spark.executor.memory").orNull)
+    executorCores = Option(executorCores)
+      .getOrElse(defaultProperties.get("spark.executor.cores").orNull)
+    totalExecutorCores = Option(totalExecutorCores)
+      .getOrElse(defaultProperties.get("spark.cores.max").orNull)
+    name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
+    jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
 
-  // Sanity checks
-  if (args.length == 0) printUsageAndExit(-1)
-  if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
-  if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+    // This supports env vars in older versions of Spark
+    master = Option(master).getOrElse(System.getenv("MASTER"))
+    deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE"))
+
+    // Global defaults. These should be keep to minimum to avoid confusing behavior.
+    master = Option(master).getOrElse("local")
+  }
+
+  /** Ensure that required fields exists. Call this only once all defaults are loaded. */
+  private def checkRequiredArguments() = {
+    if (args.length == 0) printUsageAndExit(-1)
+    if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
+    if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+  }
 
   override def toString =  {
     s"""Parsed arguments:
-    |  master             $master
-    |  deployMode         $deployMode
-    |  executorMemory     $executorMemory
-    |  executorCores      $executorCores
-    |  totalExecutorCores $totalExecutorCores
-    |  driverMemory       $driverMemory
-    |  drivercores        $driverCores
-    |  supervise          $supervise
-    |  queue              $queue
-    |  numExecutors       $numExecutors
-    |  files              $files
-    |  archives           $archives
-    |  mainClass          $mainClass
-    |  primaryResource    $primaryResource
-    |  name               $name
-    |  childArgs          [${childArgs.mkString(" ")}]
-    |  jars               $jars
-    |  verbose            $verbose
+    |  master                  $master
+    |  deployMode              $deployMode
+    |  executorMemory          $executorMemory
+    |  executorCores           $executorCores
+    |  totalExecutorCores      $totalExecutorCores
+    |  propertiesFile          $propertiesFile
+    |  driverMemory            $driverMemory
+    |  driverCores             $driverCores
+    |  driverExtraClassPath    $driverExtraClassPath
+    |  driverExtraLibraryPath  $driverExtraLibraryPath
+    |  driverExtraJavaOptions  $driverExtraJavaOptions
+    |  supervise               $supervise
+    |  queue                   $queue
+    |  numExecutors            $numExecutors
+    |  files                   $files
+    |  archives                $archives
+    |  mainClass               $mainClass
+    |  primaryResource         $primaryResource
+    |  name                    $name
+    |  childArgs               [${childArgs.mkString(" ")}]
+    |  jars                    $jars
+    |  verbose                 $verbose
+    |
+    |Default properties from $propertiesFile:
+    |${getDefaultSparkProperties.mkString("  ", "\n  ", "\n")}
     """.stripMargin
   }
 
-  private def loadEnvVars() {
-    Option(System.getenv("MASTER")).map(master = _)
-    Option(System.getenv("DEPLOY_MODE")).map(deployMode = _)
-  }
-
   private def parseOpts(opts: List[String]): Unit = opts match {
     case ("--name") :: value :: tail =>
       name = value
@@ -122,6 +192,22 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
       driverCores = value
       parseOpts(tail)
 
+    case ("--driver-class-path") :: value :: tail =>
+      driverExtraClassPath = value
+      parseOpts(tail)
+
+    case ("--driver-java-options") :: value :: tail =>
+      driverExtraJavaOptions = value
+      parseOpts(tail)
+
+    case ("--driver-library-path") :: value :: tail =>
+      driverExtraLibraryPath = value
+      parseOpts(tail)
+
+    case ("--properties-file") :: value :: tail =>
+      propertiesFile = value
+      parseOpts(tail)
+
     case ("--supervise") :: tail =>
       supervise = true
       parseOpts(tail)
@@ -154,6 +240,18 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
       parseOpts(tail)
 
     case value :: tail =>
+      if (value.startsWith("-")) {
+        val errMessage = s"Unrecognized option '$value'."
+        val suggestion: Option[String] = value match {
+          case v if v.startsWith("--") && v.contains("=") =>
+            val parts = v.split("=")
+            Some(s"Perhaps you want '${parts(0)} ${parts(1)}'?")
+          case _ =>
+            None
+        }
+        SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(""))
+      }
+
       if (primaryResource != null) {
         val error = s"Found two conflicting resources, $value and $primaryResource." +
           " Expecting only one resource."
@@ -178,11 +276,21 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
         |  --class CLASS_NAME          Name of your app's main class (required for Java apps).
         |  --arg ARG                   Argument to be passed to your application's main class. This
         |                              option can be specified multiple times for multiple args.
-        |  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 512M).
         |  --name NAME                 The name of your application (Default: 'Spark').
         |  --jars JARS                 A comma-separated list of local jars to include on the
         |                              driver classpath and that SparkContext.addJar will work
         |                              with. Doesn't work on standalone with 'cluster' deploy mode.
+        |  --files FILES               Comma separated list of files to be placed in the working dir
+        |                              of each executor.
+        |  --properties-file FILE      Path to a file from which to load extra properties. If not
+        |                              specified, this will look for conf/spark-defaults.conf.
+        |
+        |  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 512M).
+        |  --driver-java-options       Extra Java options to pass to the driver
+        |  --driver-library-path       Extra library path entries to pass to the driver
+        |  --driver-class-path         Extra class path entries to pass to the driver
+        |
+        |  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).
         |
         | Spark standalone with cluster deploy mode only:
         |  --driver-cores NUM          Cores for driver (Default: 1).
@@ -193,14 +301,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
         |
         | YARN-only:
         |  --executor-cores NUM        Number of cores per executor (Default: 1).
-        |  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).
         |  --queue QUEUE_NAME          The YARN queue to submit to (Default: 'default').
         |  --num-executors NUM         Number of executors to (Default: 2).
-        |  --files FILES               Comma separated list of files to be placed in the working dir
-        |                              of each executor.
         |  --archives ARCHIVES         Comma separated list of archives to be extracted into the
         |                              working dir of each executor.""".stripMargin
     )
     SparkSubmit.exitFn()
   }
 }
+
+object SparkSubmitArguments {
+  /** Load properties present in the given file. */
+  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
+    require(file.exists(), s"Properties file ${file.getName} does not exist")
+    val inputStream = new FileInputStream(file)
+    val properties = new Properties()
+    try {
+      properties.load(inputStream)
+    } catch {
+      case e: IOException =>
+        val message = s"Failed when loading Spark properties file ${file.getName}"
+        throw new SparkException(message, e)
+    }
+    properties.stringPropertyNames().toSeq.map(k => (k, properties(k)))
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 63f166d401059af30c15e0f5a881324368827053..888dd45e93c6a73a369a6fa6e406e1e440eaad11 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -49,8 +49,8 @@ private[spark] object TestClient {
     val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
       conf = conf, securityManager = new SecurityManager(conf))
     val desc = new ApplicationDescription(
-      "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
-      Some("dummy-spark-home"), "ignored")
+      "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
+        Seq()), Some("dummy-spark-home"), "ignored")
     val listener = new TestListener
     val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
     client.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 0c761dfc93a1f28680d5d3c222783ff9f777a55b..9103c885fa96c0e84f4274fbcab9c31d41159566 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -46,21 +46,26 @@ object CommandUtils extends Logging {
    * the way the JAVA_OPTS are assembled there.
    */
   def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
-    val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command)
-      .map(p => List("-Djava.library.path=" + p))
-      .getOrElse(Nil)
-    val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS"))
-      .map(Utils.splitCommandString).getOrElse(Nil)
-    val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
     val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
+    // Note, this will coalesce multiple options into a single command component
+    val extraOpts = command.extraJavaOptions.toSeq
+    val libraryOpts =
+      if (command.libraryPathEntries.size > 0) {
+        val joined = command.libraryPathEntries.mkString(File.pathSeparator)
+        Seq(s"-Djava.library.path=$joined")
+      } else {
+         Seq()
+      }
 
     // Figure out our classpath with the external compute-classpath script
     val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
     val classPath = Utils.executeAndGetOutput(
       Seq(sparkHome + "/bin/compute-classpath" + ext),
       extraEnvironment=command.environment)
+    val userClassPath = command.classPathEntries.mkString(File.pathSeparator)
+    val classPathWithUser = classPath + File.pathSeparator + userClassPath
 
-    Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
+    Seq("-cp", classPathWithUser) ++ libraryOpts ++ extraOpts ++ memoryOpts
   }
 
   /** Spawn a thread that will redirect a given stream to a file */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index b4df1a0dd47184c12b24d9a460faa63eb3547ca1..f918b42c83bc62c0563cef7b4d91e4b7b83d5371 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.worker
 import java.io._
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.Map
+import scala.collection.Map
 
 import akka.actor.ActorRef
 import com.google.common.base.Charsets
@@ -74,13 +74,17 @@ private[spark] class DriverRunner(
 
           // Make sure user application jar is on the classpath
           // TODO: If we add ability to submit multiple jars they should also be added here
-          val env = Map(driverDesc.command.environment.toSeq: _*)
-          env("SPARK_CLASSPATH") = env.getOrElse("SPARK_CLASSPATH", "") + s":$localJarFilename"
-          val newCommand = Command(driverDesc.command.mainClass,
-            driverDesc.command.arguments.map(substituteVariables), env)
+          val classPath = driverDesc.command.classPathEntries ++ Seq(s"$localJarFilename")
+          val newCommand = Command(
+            driverDesc.command.mainClass,
+            driverDesc.command.arguments.map(substituteVariables),
+            driverDesc.command.environment,
+            classPath,
+            driverDesc.command.libraryPathEntries,
+            driverDesc.command.extraJavaOptions)
           val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
             sparkHome.getAbsolutePath)
-          launchDriver(command, env, driverDir, driverDesc.supervise)
+          launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
         }
         catch {
           case e: Exception => finalException = Some(e)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 2edd921066876ae242469d7b3c1d0712fef29556..f94cd685e8eb08b66874c8c3e56f2fca512cc37b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -99,7 +99,9 @@ private[spark] class ExecutorRunner(
 
   def getCommandSeq = {
     val command = Command(appDesc.command.mainClass,
-      appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment)
+      appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment,
+      appDesc.command.classPathEntries, appDesc.command.libraryPathEntries,
+      appDesc.command.extraJavaOptions)
     CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f89b2bffd167649cc0e5dad4d1accc216718a4b2..2bfb9c387e1c95021180989bbf2e686b00c153c2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -64,9 +64,10 @@ private[spark] class Executor(
   // to what Yarn on this system said was available. This will be used later when SparkEnv
   // created.
   if (java.lang.Boolean.valueOf(
-      System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))))
-  {
+      System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
     conf.set("spark.local.dir", getYarnLocalDirs())
+  } else if (sys.env.contains("SPARK_LOCAL_DIRS")) {
+    conf.set("spark.local.dir", sys.env("SPARK_LOCAL_DIRS"))
   }
 
   if (!isLocal) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 936e9db80573d81b75b2bc4963b5ae7fe0317844..9544ca05dca700180da20d42ae9bd6f10090c3a8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -42,11 +42,20 @@ private[spark] class SparkDeploySchedulerBackend(
 
     // The endpoint for executors to talk to us
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      conf.get("spark.driver.host"),  conf.get("spark.driver.port"),
+      conf.get("spark.driver.host"), conf.get("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
     val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
+    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
+    val classPathEntries = sys.props.get("spark.executor.extraClassPath").toSeq.flatMap { cp =>
+      cp.split(java.io.File.pathSeparator)
+    }
+    val libraryPathEntries = sys.props.get("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
+      cp.split(java.io.File.pathSeparator)
+    }
+
     val command = Command(
-      "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
+      "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.testExecutorEnvs,
+      classPathEntries, libraryPathEntries, extraJavaOpts)
     val sparkHome = sc.getSparkHome()
     val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
       sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 06b041e1fd9a91d94170e2d32a5f01f92ce80ac3..2cd9d6c12eaf79076307e49a75d95257dcf4e133 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -111,7 +111,18 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   def createCommand(offer: Offer, numCores: Int): CommandInfo = {
     val environment = Environment.newBuilder()
-    sc.executorEnvs.foreach { case (key, value) =>
+    val extraClassPath = conf.getOption("spark.executor.extraClassPath")
+    extraClassPath.foreach { cp =>
+      environment.addVariables(
+        Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
+    }
+    val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")
+
+    val libraryPathOption = "spark.executor.extraLibraryPath"
+    val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p")
+    val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
+
+    sc.testExecutorEnvs.foreach { case (key, value) =>
       environment.addVariables(Environment.Variable.newBuilder()
         .setName(key)
         .setValue(value)
@@ -123,20 +134,22 @@ private[spark] class CoarseMesosSchedulerBackend(
       conf.get("spark.driver.host"),
       conf.get("spark.driver.port"),
       CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
     val uri = conf.get("spark.executor.uri", null)
     if (uri == null) {
       val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
       command.setValue(
-        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
-          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d".format(
+          runScript, extraOpts, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
     } else {
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
       val basename = uri.split('/').last.split('.').head
       command.setValue(
         ("cd %s*; " +
-          "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
-          .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+          "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d")
+          .format(basename, extraOpts, driverUrl, offer.getSlaveId.getValue,
+            offer.getHostname, numCores))
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }
     command.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index dfdcafe19fb93e337702cc48253173589616e214..c975f312324ed27ac83aa491b7d5ee6bbb9e9080 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -90,7 +90,7 @@ private[spark] class MesosSchedulerBackend(
       "Spark home is not set; set it through the spark.home system " +
       "property, the SPARK_HOME environment variable or the SparkContext constructor"))
     val environment = Environment.newBuilder()
-    sc.executorEnvs.foreach { case (key, value) =>
+    sc.testExecutorEnvs.foreach { case (key, value) =>
       environment.addVariables(Environment.Variable.newBuilder()
         .setName(key)
         .setValue(value)
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 9f2924c23b73c7fbda060678b476e9dbd7707353..bfae32dae0dc54ec2ed3f117e588ded0a5868506 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -87,7 +87,7 @@ class JsonProtocolSuite extends FunSuite {
   }
 
   def createAppDesc(): ApplicationDescription = {
-    val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
+    val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq())
     new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
   }
 
@@ -100,7 +100,7 @@ class JsonProtocolSuite extends FunSuite {
 
   def createDriverCommand() = new Command(
     "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
-    Map(("K1", "V1"), ("K2", "V2"))
+    Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo")
   )
 
   def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
@@ -133,9 +133,12 @@ class JsonProtocolSuite extends FunSuite {
 
   def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
     val Diff(c, a, d) = validateJson diff expectedJson
-    assert(c === JNothing, "Json changed")
-    assert(a === JNothing, "Json added")
-    assert(d === JNothing, "Json deleted")
+    val validatePretty = JsonMethods.pretty(validateJson)
+    val expectedPretty = JsonMethods.pretty(expectedJson)
+    val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty"
+    assert(c === JNothing, s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}")
+    assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}")
+    assert(d === JNothing, s"$errorMessage\nDelected:\n${JsonMethods.pretty(d)}")
   }
 }
 
@@ -165,7 +168,7 @@ object JsonConstants {
     """
       |{"name":"name","cores":4,"memoryperslave":1234,
       |"user":"%s","sparkhome":"sparkHome",
-      |"command":"Command(mainClass,List(arg1, arg2),Map())"}
+      |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),None)"}
     """.format(System.getProperty("user.name", "<unknown>")).stripMargin
 
   val executorRunnerJsonStr =
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 4e489cd9b66a693c5f12dafdaf91e4f0eeaacb1b..f82d717719b69fc5ce86cac7488ade4e1175c161 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -17,16 +17,16 @@
 
 package org.apache.spark.deploy
 
-import java.io.{OutputStream, PrintStream}
+import java.io.{File, OutputStream, PrintStream}
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException, TestUtils}
+import org.apache.spark.deploy.SparkSubmit._
+import org.apache.spark.util.Utils
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.deploy.SparkSubmit._
-
-
 class SparkSubmitSuite extends FunSuite with ShouldMatchers {
 
   val noOpOutputStream = new OutputStream {
@@ -42,7 +42,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   /** Returns true if the script exits and the given search string is printed. */
-  def testPrematureExit(input: Array[String], searchString: String): Boolean = {
+  def testPrematureExit(input: Array[String], searchString: String) = {
     val printStream = new BufferPrintStream()
     SparkSubmit.printStream = printStream
 
@@ -60,28 +60,38 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     }
     thread.start()
     thread.join()
-    printStream.lineBuffer.find(s => s.contains(searchString)).size > 0
+    val joined = printStream.lineBuffer.mkString("\n")
+    if (!joined.contains(searchString)) {
+      fail(s"Search string '$searchString' not found in $joined")
+    }
   }
 
   test("prints usage on empty input") {
-    testPrematureExit(Array[String](), "Usage: spark-submit") should be (true)
+    testPrematureExit(Array[String](), "Usage: spark-submit")
   }
 
   test("prints usage with only --help") {
-    testPrematureExit(Array("--help"), "Usage: spark-submit") should be (true)
+    testPrematureExit(Array("--help"), "Usage: spark-submit")
+  }
+
+  test("prints error with unrecognized option") {
+    testPrematureExit(Array("--blarg"), "Unrecognized option '--blarg'")
+    testPrematureExit(Array("-bleg"), "Unrecognized option '-bleg'")
+    testPrematureExit(Array("--master=abc"),
+      "Unrecognized option '--master=abc'. Perhaps you want '--master abc'?")
   }
 
   test("handles multiple binary definitions") {
     val adjacentJars = Array("foo.jar", "bar.jar")
-    testPrematureExit(adjacentJars, "error: Found two conflicting resources") should be (true)
+    testPrematureExit(adjacentJars, "error: Found two conflicting resources")
 
     val nonAdjacentJars =
       Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar")
-    testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources") should be (true)
+    testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources")
   }
 
   test("handle binary specified but not class") {
-    testPrematureExit(Array("foo.jar"), "must specify a main class")
+    testPrematureExit(Array("foo.jar"), "Must specify a main class")
   }
 
   test("handles YARN cluster mode") {
@@ -140,12 +150,11 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     val childArgsStr = childArgs.mkString(" ")
-    print("child args: " + childArgsStr)
     childArgsStr.startsWith("--memory 4g --cores 5 --supervise") should be (true)
     childArgsStr should include ("launch spark://h:p thejar.jar org.SomeClass arg1 arg2")
     mainClass should be ("org.apache.spark.deploy.Client")
     classpath should have length (0)
-    sysProps should have size (0)
+    sysProps should have size (1) // contains --jar entry
   }
 
   test("handles standalone client mode") {
@@ -175,4 +184,80 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.cores.max") should be ("5")
   }
+
+  test("launch simple application with spark-submit") {
+    runSparkSubmit(
+      Seq("unUsed.jar",
+        "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+        "--name", "testApp",
+        "--master", "local"))
+  }
+
+  test("spark submit includes jars passed in through --jar") {
+    val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+    val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+    val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+    runSparkSubmit(
+      Seq("unUsed.jar",
+        "--class", JarCreationTest.getClass.getName.stripSuffix("$"),
+        "--name", "testApp",
+        "--master", "local-cluster[2,1,512]",
+        "--jars", jarsString))
+  }
+
+  // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
+  def runSparkSubmit(args: Seq[String]): String = {
+    val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+    Utils.executeAndGetOutput(
+      Seq("./bin/spark-submit") ++ args,
+      new File(sparkHome),
+      Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+  }
+}
+
+object JarCreationTest {
+  def main(args: Array[String]) {
+    val conf = new SparkConf()
+    val sc = new SparkContext(conf)
+    val result = sc.makeRDD(1 to 100, 10).mapPartitions{ x =>
+      var foundClasses = false
+      try {
+        Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader)
+        Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader)
+        foundClasses = true
+      } catch {
+        case _: Throwable => // catch all
+      }
+      Seq(foundClasses).iterator
+    }.collect()
+    if (result.contains(false)) {
+      throw new Exception("Could not load user defined classes inside of executors")
+    }
+  }
+}
+
+object SimpleApplicationTest {
+  def main(args: Array[String]) {
+    val conf = new SparkConf()
+    val sc = new SparkContext(conf)
+
+    val configs = Seq("spark.master", "spark.app.name")
+    for (config <- configs) {
+      val masterValue = conf.get(config)
+      val executorValues = sc
+        .makeRDD(1 to 100, 10)
+        .map(x => SparkEnv.get.conf.get(config))
+        .collect()
+        .distinct
+      if (executorValues.size != 1) {
+        throw new SparkException(s"Inconsistent values for $config: $executorValues")
+      }
+      val executorValue = executorValues(0)
+      if (executorValue != masterValue) {
+        throw new SparkException(
+          s"Master had $config=$masterValue but executor had $config=$executorValue")
+      }
+    }
+
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index a2c131b0c978789155dce50fd6eb87065865cde3..4633bc3f7f25efb61fba8dbc6a4b9a7abfdff709 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription}
 
 class DriverRunnerTest extends FunSuite {
   private def createDriverRunner() = {
-    val command = new Command("mainClass", Seq(), Map())
+    val command = new Command("mainClass", Seq(), Map(), Seq(), Seq())
     val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
     new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
       null, "akka://1.2.3.4/worker/")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 3cab8e7b379341def636162693721e55d36251b6..8ae387fa0be6febaaa8a1266d289a40e64fb3d71 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -27,7 +27,8 @@ class ExecutorRunnerTest extends FunSuite {
   test("command includes appId") {
     def f(s:String) = new File(s)
     val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
-    val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()),
+    val appDesc = new ApplicationDescription("app name", Some(8), 500,
+      Command("foo", Seq(), Map(), Seq(), Seq()),
       sparkHome, "appUiUrl")
     val appId = "12345-worker321-9876"
     val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
diff --git a/dev/audit-release/audit_release.py b/dev/audit-release/audit_release.py
index fa2f02dfecc75ee64c0ebeaa1a16b397faaca8da..4a816d4101e57b497d0c1e18c649c50a7868f4d8 100755
--- a/dev/audit-release/audit_release.py
+++ b/dev/audit-release/audit_release.py
@@ -114,6 +114,7 @@ os.chdir("blank_sbt_build")
 os.environ["SPARK_VERSION"] = RELEASE_VERSION
 os.environ["SCALA_VERSION"] = SCALA_VERSION
 os.environ["SPARK_RELEASE_REPOSITORY"] = RELEASE_REPOSITORY
+os.environ["SPARK_AUDIT_MASTER"] = "local"
 for module in modules:
   os.environ["SPARK_MODULE"] = module
   ret = run_cmd("sbt clean update", exit_on_failure=False)
diff --git a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
index 53fe43215e40ea3515d5bbf63de375bb0d0a7ca2..a89b0d7d38bf1f659812cb25d8eeff1a3f3cba6c 100644
--- a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
+++ b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
@@ -24,8 +24,13 @@ import org.apache.spark.SparkContext._
 
 object SimpleApp {
   def main(args: Array[String]) {
+    val conf = sys.env.get("SPARK_AUDIT_MASTER") match {
+      case Some(master) => new SparkConf().setAppName("Simple Spark App").setMaster(master)
+      case None => new SparkConf().setAppName("Simple Spark App")
+    }
     val logFile = "input.txt"
-    val sc = new SparkContext("local", "Simple App")
+    val sc = new SparkContext(conf)
+    SparkContext.jarOfClass(this.getClass).foreach(sc.addJar)
     val logData = sc.textFile(logFile, 2).cache()
     val numAs = logData.filter(line => line.contains("a")).count()
     val numBs = logData.filter(line => line.contains("b")).count()
diff --git a/dev/audit-release/sbt_app_graphx/src/main/scala/GraphxApp.scala b/dev/audit-release/sbt_app_graphx/src/main/scala/GraphxApp.scala
index da08e014ebd94a20729cae7f9562e1c95553788c..24c7f8d6672969edef690c938c0846eac46d302e 100644
--- a/dev/audit-release/sbt_app_graphx/src/main/scala/GraphxApp.scala
+++ b/dev/audit-release/sbt_app_graphx/src/main/scala/GraphxApp.scala
@@ -17,14 +17,20 @@
 
 package main.scala
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.SparkContext._
 import org.apache.spark.graphx._
 import org.apache.spark.rdd.RDD
 
 object GraphXApp {
   def main(args: Array[String]) {
-    val sc = new SparkContext("local", "Simple GraphX App")
+    val conf = sys.env.get("SPARK_AUDIT_MASTER") match {
+      case Some(master) => new SparkConf().setAppName("Simple GraphX App").setMaster(master)
+      case None => new SparkConf().setAppName("Simple Graphx App")
+    }
+    val sc = new SparkContext(conf)
+    SparkContext.jarOfClass(this.getClass).foreach(sc.addJar)
+
     val users: RDD[(VertexId, (String, String))] =
       sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                            (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
diff --git a/dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala b/dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala
index 3d0722d2ac45edd34ae4a9507eea0c638abe4b80..a1d8971abe9a4f3ffcf867edec4e6efb544b8e8e 100644
--- a/dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala
+++ b/dev/audit-release/sbt_app_streaming/src/main/scala/StreamingApp.scala
@@ -27,10 +27,12 @@ import org.apache.spark.streaming._
 object SparkStreamingExample {
 
   def main(args: Array[String]) {
-    val conf = new SparkConf(true)
-      .setMaster("local[2]")
-      .setAppName("Streaming test")
+    val conf = sys.env.get("SPARK_AUDIT_MASTER") match {
+      case Some(master) => new SparkConf().setAppName("Simple Streaming App").setMaster(master)
+      case None => new SparkConf().setAppName("Simple Streaming App")
+    }
     val ssc = new StreamingContext(conf, Seconds(1))
+    SparkContext.jarOfClass(this.getClass).foreach(ssc.sparkContext.addJar)
     val seen = ListBuffer[RDD[Int]]()
 
     val rdd1 = ssc.sparkContext.makeRDD(1 to 100, 10)
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index 7f75ea44e4ceac3ac7c3b76a67c371354375a86b..dcc063042628cbdcd954d12fcc03a0bed0bb2ec9 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -50,61 +50,78 @@ The system currently supports three cluster managers:
 In addition, Spark's [EC2 launch scripts](ec2-scripts.html) make it easy to launch a standalone
 cluster on Amazon EC2.
 
-# Launching Applications
-
-The recommended way to launch a compiled Spark application is through the spark-submit script (located in the
-bin directory), which takes care of setting up the classpath with Spark and its dependencies, as well as
-provides a layer over the different cluster managers and deploy modes that Spark supports.  It's usage is
-
-  spark-submit `<app jar>` `<options>`
-
-Where options are any of:
-
-- **\--class** - The main class to run.
-- **\--master** - The URL of the cluster manager master, e.g. spark://host:port, mesos://host:port, yarn,
-  or local.
-- **\--deploy-mode** - "client" to run the driver in the client process or "cluster" to run the driver in
-  a process on the cluster.  For Mesos, only "client" is supported.
-- **\--executor-memory** - Memory per executor (e.g. 1000M, 2G).
-- **\--executor-cores** - Number of cores per executor. (Default: 2)
-- **\--driver-memory** - Memory for driver (e.g. 1000M, 2G)
-- **\--name** - Name of the application.
-- **\--arg** - Argument to be passed to the application's main class. This option can be specified
-  multiple times to pass multiple arguments.
-- **\--jars** - A comma-separated list of local jars to include on the driver classpath and that
-  SparkContext.addJar will work with. Doesn't work on standalone with 'cluster' deploy mode.
-
-The following currently only work for Spark standalone with cluster deploy mode:
-
-- **\--driver-cores** - Cores for driver (Default: 1).
-- **\--supervise** - If given, restarts the driver on failure.
-
-The following only works for Spark standalone and Mesos only:
-
-- **\--total-executor-cores** - Total cores for all executors.
-
-The following currently only work for YARN:
-
-- **\--queue** - The YARN queue to place the application in.
-- **\--files** - Comma separated list of files to be placed in the working dir of each executor.
-- **\--archives** - Comma separated list of archives to be extracted into the working dir of each
-  executor.
-- **\--num-executors** - Number of executors (Default: 2).
-
-The master and deploy mode can also be set with the MASTER and DEPLOY_MODE environment variables.
-Values for these options passed via command line will override the environment variables.
-
-# Shipping Code to the Cluster
-
-The recommended way to ship your code to the cluster is to pass it through SparkContext's constructor,
-which takes a list of JAR files (Java/Scala) or .egg and .zip libraries (Python) to disseminate to
-worker nodes. You can also dynamically add new files to be sent to executors with `SparkContext.addJar`
-and `addFile`.
-
-## URIs for addJar / addFile
-
-- **file:** - Absolute paths and `file:/` URIs are served by the driver's HTTP file server, and every executor
-  pulls the file from the driver HTTP server
+# Bundling and Launching Applications
+
+### Bundling Your Application's Dependencies
+If your code depends on other projects, you will need to package them alongside
+your application in order to distribute the code to a Spark cluster. To do this,
+to create an assembly jar (or "uber" jar) containing your code and its dependencies. Both
+[sbt](https://github.com/sbt/sbt-assembly) and
+[Maven](http://maven.apache.org/plugins/maven-shade-plugin/)
+have assembly plugins. When creating assembly jars, list Spark and Hadoop
+as `provided` dependencies; these need not be bundled since they are provided by
+the cluster manager at runtime. Once you have an assembled jar you can call the `bin/spark-submit`
+script as shown here while passing your jar.
+
+For Python, you can use the `pyFiles` argument of SparkContext
+or its `addPyFile` method to add `.py`, `.zip` or `.egg` files to be distributed.
+
+### Launching Applications with ./bin/spark-submit
+
+Once a user application is bundled, it can be launched using the `spark-submit` script located in
+the bin directory. This script takes care of setting up the classpath with Spark and its
+dependencies, and can support different cluster managers and deploy modes that Spark supports.
+It's usage is
+
+    ./bin/spark-submit <app jar> --class path.to.your.Class [other options..]
+
+To enumerate all options available to `spark-submit` run it with the `--help` flag.
+Here are a few examples of common options:
+
+{% highlight bash %}
+# Run application locally
+./bin/spark-submit my-app.jar \
+  --class my.main.ClassName
+  --master local[8]
+
+# Run on a Spark cluster
+./bin/spark-submit my-app.jar \
+  --class my.main.ClassName
+  --master spark://mycluster:7077 \
+  --executor-memory 20G \
+  --total-executor-cores 100
+
+# Run on a YARN cluster
+HADOOP_CONF_DIR=XX /bin/spark-submit my-app.jar \
+  --class my.main.ClassName
+  --master yarn-cluster \  # can also be `yarn-client` for client mode
+  --executor-memory 20G \
+  --num-executors 50
+{% endhighlight %}
+
+### Loading Configurations from a File
+
+The `spark-submit` script can load default `SparkConf` values from a properties file and pass them
+onto your application. By default it will read configuration options from
+`conf/spark-defaults.conf`. Any values specified in the file will be passed on to the
+application when run. They can obviate the need for certain flags to `spark-submit`: for
+instance, if `spark.master` property is set, you can safely omit the
+`--master` flag from `spark-submit`. In general, configuration values explicitly set on a
+`SparkConf` take the highest precedence, then flags passed to `spark-submit`, then values
+in the defaults file.
+
+If you are ever unclear where configuration options are coming from. fine-grained debugging
+information can be printed by adding the `--verbose` option to `./spark-submit`.
+
+### Advanced Dependency Management
+When using `./bin/spark-submit` jars will be automatically transferred to the cluster. For many
+users this is sufficient. However, advanced users can add jars by calling `addFile` or `addJar`
+on an existing SparkContext. This can be used to distribute JAR files (Java/Scala) or .egg and
+.zip libraries (Python) to executors. Spark uses the following URL scheme to allow different
+strategies for disseminating jars:
+
+- **file:** - Absolute paths and `file:/` URIs are served by the driver's HTTP file server, and
+  every executor pulls the file from the driver HTTP server
 - **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected
 - **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node.  This
   means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker,
@@ -138,6 +155,14 @@ The following table summarizes terms you'll see used to refer to cluster concept
       <td>Application</td>
       <td>User program built on Spark. Consists of a <em>driver program</em> and <em>executors</em> on the cluster.</td>
     </tr>
+    <tr>
+      <td>Application jar</td>
+      <td>
+        A jar containing the user's Spark application. In some cases users will want to create
+        an "uber jar" containing their application along with its dependencies. The user's jar
+        should never include Hadoop or Spark libraries, however, these will be added at runtime.
+      </td>
+    </tr>
     <tr>
       <td>Driver program</td>
       <td>The process running the main() function of the application and creating the SparkContext</td>
diff --git a/docs/configuration.md b/docs/configuration.md
index a3029837ff0cdd7516a40b30e4a8db3e9ceada55..5a4abca2646b6f58af934f517524d0ad823a95c1 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -73,6 +73,9 @@ there are at least five properties that you will commonly want to control:
     Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored
     on disk. This should be on a fast, local disk in your system. It can also be a comma-separated
     list of multiple directories on different disks.
+
+    NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or
+    LOCAL_DIRS (YARN) envrionment variables set by the cluster manager.
   </td>
 </tr>
 <tr>
@@ -578,7 +581,7 @@ Apart from these, the following properties are also available, and may be useful
     to consolidate them onto as few nodes as possible. Spreading out is usually better for
     data locality in HDFS, but consolidating is more efficient for compute-intensive workloads. <br/>
     <b>Note:</b> this setting needs to be configured in the standalone cluster master, not in individual
-    applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
+    applications; you can set it through <code>SPARK_MASTER_OPTS</code> in <code>spark-env.sh</code>.
   </td>
 </tr>
 <tr>
@@ -591,7 +594,7 @@ Apart from these, the following properties are also available, and may be useful
     Set this lower on a shared cluster to prevent users from grabbing
     the whole cluster by default. <br/>
     <b>Note:</b> this setting needs to be configured in the standalone cluster master, not in individual
-    applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
+    applications; you can set it through <code>SPARK_MASTER_OPTS</code> in <code>spark-env.sh</code>.
   </td>
 </tr>
 <tr>
@@ -649,6 +652,34 @@ Apart from these, the following properties are also available, and may be useful
     Number of cores to allocate for each task.
   </td>
 </tr>
+<tr>
+  <td>spark.executor.extraJavaOptions</td>
+  <td>(none)</td>
+  <td>
+    A string of extra JVM options to pass to executors. For instance, GC settings or other
+    logging. Note that it is illegal to set Spark properties or heap size settings with this 
+    option. Spark properties should be set using a SparkConf object or the 
+    spark-defaults.conf file used with the spark-submit script. Heap size settings can be set
+    with spark.executor.memory.
+  </td>
+</tr>
+<tr>
+  <td>spark.executor.extraClassPath</td>
+  <td>(none)</td>
+  <td>
+    Extra classpath entries to append to the classpath of executors. This exists primarily
+    for backwards-compatibility with older versions of Spark. Users typically should not need
+    to set this option.
+  </td>
+</tr>
+<tr>
+  <td>spark.executor.extraLibraryPath</td>
+  <td>(none)</td>
+  <td>
+    Set a special library path to use when launching executor JVM's.
+  </td>
+</tr>
+
 </table>
 
 ## Viewing Spark Properties
@@ -659,10 +690,9 @@ This is a useful place to check to make sure that your properties have been set
 # Environment Variables
 
 Certain Spark settings can be configured through environment variables, which are read from the `conf/spark-env.sh`
-script in the directory where Spark is installed (or `conf/spark-env.cmd` on Windows). These variables are meant to be for machine-specific settings, such
-as library search paths. While Spark properties can also be set there through `SPARK_JAVA_OPTS`, for per-application settings, we recommend setting
-these properties within the application instead of in `spark-env.sh` so that different applications can use different
-settings.
+script in the directory where Spark is installed (or `conf/spark-env.cmd` on Windows). In Standalone and Mesos modes,
+this file can give machine specific information such as hostnames. It is also sourced when running local
+Spark applications or submission scripts.
 
 Note that `conf/spark-env.sh` does not exist by default when Spark is installed. However, you can copy
 `conf/spark-env.sh.template` to create it. Make sure you make the copy executable.
@@ -672,13 +702,7 @@ The following variables can be set in `spark-env.sh`:
 * `JAVA_HOME`, the location where Java is installed (if it's not on your default `PATH`)
 * `PYSPARK_PYTHON`, the Python binary to use for PySpark
 * `SPARK_LOCAL_IP`, to configure which IP address of the machine to bind to.
-* `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
-* `SPARK_CLASSPATH`, to add elements to Spark's classpath that you want to be present for _all_ applications.
-   Note that applications can also add dependencies for themselves through `SparkContext.addJar` -- we recommend
-   doing that when possible.
-* `SPARK_JAVA_OPTS`, to add JVM options. This includes Java options like garbage collector settings and any system
-   properties that you'd like to pass with `-D`. One use case is to set some Spark properties differently on this
-   machine, e.g., `-Dspark.local.dir=/disk1,/disk2`.
+* `SPARK_PUBLIC_DNS`, the hostname your Spark program will advertise to other machines.
 * Options for the Spark [standalone cluster scripts](spark-standalone.html#cluster-launch-scripts), such as number of cores
   to use on each machine and maximum memory.
 
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 60e8b1ba0eb46acbef954ccf763b757ee9acbfbe..6b4f4ba4254a252f5445c7a637dfd235f514cd8b 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -99,23 +99,32 @@ scala> linesWithSpark.count()
 res9: Long = 15
 {% endhighlight %}
 
-It may seem silly to use Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `bin/spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark).
+It may seem silly to use Spark to explore and cache a 30-line text file. The interesting part is
+that these same functions can be used on very large data sets, even when they are striped across
+tens or hundreds of nodes. You can also do this interactively by connecting `bin/spark-shell` to
+a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark).
 
-# A Standalone App in Scala
-Now say we wanted to write a standalone application using the Spark API. We will walk through a simple application in both Scala (with SBT), Java (with Maven), and Python. If you are using other build systems, consider using the Spark assembly JAR described in the developer guide.
+# A Standalone Application
+Now say we wanted to write a standalone application using the Spark API. We will walk through a
+simple application in both Scala (with SBT), Java (with Maven), and Python.
 
-We'll create a very simple Spark application in Scala. So simple, in fact, that it's named `SimpleApp.scala`:
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+We'll create a very simple Spark application in Scala. So simple, in fact, that it's
+named `SimpleApp.scala`:
 
 {% highlight scala %}
 /*** SimpleApp.scala ***/
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
+import org.apache.spark.SparkConf
 
 object SimpleApp {
   def main(args: Array[String]) {
-    val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
-    val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME",
-      List("target/scala-{{site.SCALA_BINARY_VERSION}}/simple-project_{{site.SCALA_BINARY_VERSION}}-1.0.jar"))
+    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
+    val conf = new SparkConf().setAppName("Simple Application")
+    val sc = new SparkContext(conf)
     val logData = sc.textFile(logFile, 2).cache()
     val numAs = logData.filter(line => line.contains("a")).count()
     val numBs = logData.filter(line => line.contains("b")).count()
@@ -124,9 +133,17 @@ object SimpleApp {
 }
 {% endhighlight %}
 
-This program just counts the number of lines containing 'a' and the number containing 'b' in the Spark README. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the program. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the application, the directory where Spark is installed, and a name for the jar file containing the application's code. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes.
+This program just counts the number of lines containing 'a' and the number containing 'b' in the
+Spark README. Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is
+installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext,
+we initialize a SparkContext as part of the program.
+
+We pass the SparkContext constructor a SparkConf object which contains information about our
+application. We also call sc.addJar to make sure that when our application is launched in cluster
+mode, the jar file containing it will be shipped automatically to worker nodes.
 
-This file depends on the Spark API, so we'll also include an sbt configuration file, `simple.sbt` which explains that Spark is a dependency. This file also adds a repository that Spark depends on:
+This file depends on the Spark API, so we'll also include an sbt configuration file, `simple.sbt`
+which explains that Spark is a dependency. This file also adds a repository that Spark depends on:
 
 {% highlight scala %}
 name := "Simple Project"
@@ -140,15 +157,12 @@ libraryDependencies += "org.apache.spark" %% "spark-core" % "{{site.SPARK_VERSIO
 resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
 {% endhighlight %}
 
-If you also wish to read data from Hadoop's HDFS, you will also need to add a dependency on `hadoop-client` for your version of HDFS:
-
-{% highlight scala %}
-libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "<your-hdfs-version>"
-{% endhighlight %}
-
-Finally, for sbt to work correctly, we'll need to layout `SimpleApp.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the application's code, then use `sbt/sbt run` to execute our program.
+For sbt to work correctly, we'll need to layout `SimpleApp.scala` and `simple.sbt`
+according to the typical directory structure. Once that is in place, we can create a JAR package
+containing the application's code, then use the `spark-submit` script to run our program.
 
 {% highlight bash %}
+# Your directory layout should look like this
 $ find .
 .
 ./simple.sbt
@@ -157,27 +171,36 @@ $ find .
 ./src/main/scala
 ./src/main/scala/SimpleApp.scala
 
-$ sbt/sbt package
-$ sbt/sbt run
+# Package a jar containing your application
+$ sbt package
+...
+[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
+
+# Use spark-submit to run your application
+$ YOUR_SPARK_HOME/bin/spark-submit target/scala-2.10/simple-project_2.10-1.0.jar \
+  --class "SimpleApp" \
+  --master local[4]
 ...
 Lines with a: 46, Lines with b: 23
 {% endhighlight %}
 
-# A Standalone App in Java
-Now say we wanted to write a standalone application using the Java API. We will walk through doing this with Maven. If you are using other build systems, consider using the Spark assembly JAR described in the developer guide.
+</div>
+<div data-lang="java" markdown="1">
+This example will use Maven to compile an application jar, but any similar build system will work.
 
 We'll create a very simple Spark application, `SimpleApp.java`:
 
 {% highlight java %}
 /*** SimpleApp.java ***/
 import org.apache.spark.api.java.*;
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.Function;
 
 public class SimpleApp {
   public static void main(String[] args) {
-    String logFile = "$YOUR_SPARK_HOME/README.md"; // Should be some file on your system
-    JavaSparkContext sc = new JavaSparkContext("local", "Simple App",
-      "$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"});
+    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
+    SparkConf conf = new SparkConf().setAppName("Simple Application");
+    JavaSparkContext sc = new JavaSparkContext(conf);
     JavaRDD<String> logData = sc.textFile(logFile).cache();
 
     long numAs = logData.filter(new Function<String, Boolean>() {
@@ -193,9 +216,16 @@ public class SimpleApp {
 }
 {% endhighlight %}
 
-This program just counts the number of lines containing 'a' and the number containing 'b' in a text file. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Java programming guide](java-programming-guide.html) describes these differences in more detail.
+This program just counts the number of lines containing 'a' and the number containing 'b' in a text
+file. Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is installed.
+As with the Scala example, we initialize a SparkContext, though we use the special
+`JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by
+`JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes
+that extend `spark.api.java.function.Function`. The
+[Java programming guide](java-programming-guide.html) describes these differences in more detail.
 
-To build the program, we also write a Maven `pom.xml` file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version.
+To build the program, we also write a Maven `pom.xml` file that lists Spark as a dependency.
+Note that Spark artifacts are tagged with a Scala version.
 
 {% highlight xml %}
 <project>
@@ -221,16 +251,6 @@ To build the program, we also write a Maven `pom.xml` file that lists Spark as a
 </project>
 {% endhighlight %}
 
-If you also wish to read data from Hadoop's HDFS, you will also need to add a dependency on `hadoop-client` for your version of HDFS:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.hadoop</groupId>
-  <artifactId>hadoop-client</artifactId>
-  <version>...</version>
-</dependency>
-{% endhighlight %}
-
 We lay out these files according to the canonical Maven directory structure:
 {% highlight bash %}
 $ find .
@@ -241,16 +261,25 @@ $ find .
 ./src/main/java/SimpleApp.java
 {% endhighlight %}
 
-Now, we can execute the application using Maven:
+Now, we can package the application using Maven and execute it with `./bin/spark-submit`.
 
 {% highlight bash %}
+# Package a jar containing your application
 $ mvn package
-$ mvn exec:java -Dexec.mainClass="SimpleApp"
+...
+[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
+
+# Use spark-submit to run your application
+$ YOUR_SPARK_HOME/bin/spark-submit target/simple-project-1.0.jar \
+  --class "SimpleApp" \
+  --master local[4]
 ...
 Lines with a: 46, Lines with b: 23
 {% endhighlight %}
 
-# A Standalone App in Python
+</div>
+<div data-lang="python" markdown="1">
+
 Now we will show how to write a standalone application using the Python API (PySpark).
 
 As an example, we'll create a simple Spark application, `SimpleApp.py`:
@@ -259,7 +288,7 @@ As an example, we'll create a simple Spark application, `SimpleApp.py`:
 """SimpleApp.py"""
 from pyspark import SparkContext
 
-logFile = "$YOUR_SPARK_HOME/README.md"  # Should be some file on your system
+logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
 sc = SparkContext("local", "Simple App")
 logData = sc.textFile(logFile).cache()
 
@@ -270,11 +299,15 @@ print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
 {% endhighlight %}
 
 
-This program just counts the number of lines containing 'a' and the number containing 'b' in a text file.
-Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. 
+This program just counts the number of lines containing 'a' and the number containing 'b' in a
+text file.
+Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is installed.
 As with the Scala and Java examples, we use a SparkContext to create RDDs.
-We can pass Python functions to Spark, which are automatically serialized along with any variables that they reference.
-For applications that use custom classes or third-party libraries, we can add those code dependencies to SparkContext to ensure that they will be available on remote machines; this is described in more detail in the [Python programming guide](python-programming-guide.html).
+We can pass Python functions to Spark, which are automatically serialized along with any variables
+that they reference.
+For applications that use custom classes or third-party libraries, we can add those code
+dependencies to SparkContext to ensure that they will be available on remote machines; this is
+described in more detail in the [Python programming guide](python-programming-guide.html).
 `SimpleApp` is simple enough that we do not need to specify any code dependencies.
 
 We can run this application using the `bin/pyspark` script:
@@ -286,57 +319,12 @@ $ ./bin/pyspark SimpleApp.py
 Lines with a: 46, Lines with b: 23
 {% endhighlight python %}
 
-# Running on a Cluster
-
-There are a few additional considerations when running applicaitons on a 
-[Spark](spark-standalone.html), [YARN](running-on-yarn.html), or 
-[Mesos](running-on-mesos.html) cluster.
-
-### Including Your Dependencies
-If your code depends on other projects, you will need to ensure they are also
-present on the slave nodes. A popular approach is to create an
-assembly jar (or "uber" jar) containing your code and its dependencies. Both
-[sbt](https://github.com/sbt/sbt-assembly) and 
-[Maven](http://maven.apache.org/plugins/maven-assembly-plugin/) 
-have assembly plugins. When creating assembly jars, list Spark 
-itself as a `provided` dependency; it need not be bundled since it is 
-already present on the slaves. Once you have an assembled jar, 
-add it to the SparkContext as shown here. It is also possible to add
-your dependent jars one-by-one using the `addJar` method of `SparkContext`.
-
-For Python, you can use the `pyFiles` argument of SparkContext
-or its `addPyFile` method to add `.py`, `.zip` or `.egg` files to be distributed.
-
-### Setting Configuration Options
-Spark includes several [configuration options](configuration.html#spark-properties)
-that influence the behavior of your application.
-These should be set by building a [SparkConf](api/core/index.html#org.apache.spark.SparkConf)
-object and passing it to the SparkContext constructor.
-For example, in Java and Scala, you can do:
-
-{% highlight scala %}
-import org.apache.spark.{SparkConf, SparkContext}
-val conf = new SparkConf()
-             .setMaster("local")
-             .setAppName("My application")
-             .set("spark.executor.memory", "1g")
-val sc = new SparkContext(conf)
-{% endhighlight %}
-
-Or in Python:
-
-{% highlight scala %}
-from pyspark import SparkConf, SparkContext
-conf = SparkConf()
-conf.setMaster("local")
-conf.setAppName("My application")
-conf.set("spark.executor.memory", "1g"))
-sc = SparkContext(conf = conf)
-{% endhighlight %}
+</div>
+</div>
 
-### Accessing Hadoop Filesystems
+# Where to go from here
+Congratulations on running your first Spark application!
 
-The examples here access a local file. To read data from a distributed
-filesystem, such as HDFS, include 
-[Hadoop version information](index.html#a-note-about-hadoop-versions)
-in your build file. By default, Spark builds against HDFS 1.0.4.
+* For an in-depth overview of the API see "Programming Guides" menu section.
+* For running applications on a cluster head to the [deployment overview](cluster-overview.html).
+* For configuration options available to Spark applications see the [configuration page](configuration.html).
\ No newline at end of file
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index 2b0a51e9dfc548f3faa921b681e1db18bde813bf..4431da0721ac75848a154ccbcd17c80f56d36449 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -25,36 +25,43 @@ To write a Spark application, you need to add a dependency on Spark. If you use
     artifactId = spark-core_{{site.SCALA_BINARY_VERSION}}
     version = {{site.SPARK_VERSION}}
 
-In addition, if you wish to access an HDFS cluster, you need to add a dependency on `hadoop-client` for your version of HDFS:
+In addition, if you wish to access an HDFS cluster, you need to add a dependency on
+`hadoop-client` for your version of HDFS. Some common HDFS version tags are listed on the
+[third party distributions](hadoop-third-party-distributions.html) page.
 
     groupId = org.apache.hadoop
     artifactId = hadoop-client
     version = <your-hdfs-version>
 
-For other build systems, you can run `sbt/sbt assembly` to pack Spark and its dependencies into one JAR (`assembly/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop*.jar`), then add this to your CLASSPATH. Set the HDFS version as described [here](index.html#a-note-about-hadoop-versions).
-
 Finally, you need to import some Spark classes and implicit conversions into your program. Add the following lines:
 
 {% highlight scala %}
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
+import org.apache.spark.SparkConf
 {% endhighlight %}
 
 # Initializing Spark
 
-The first thing a Spark program must do is to create a `SparkContext` object, which tells Spark how to access a cluster.
-This is done through the following constructor:
+The first thing a Spark program must do is to create a `SparkContext` object, which tells Spark
+how to access a cluster. To create a `SparkContext` you first need to build a `SparkConf` object
+that contains information about your application.
 
 {% highlight scala %}
-new SparkContext(master, appName, [sparkHome], [jars])
+val conf = new SparkConf().setAppName(<app name>).setMaster(<master>)
+new SparkContext(conf)
 {% endhighlight %}
 
-or through `new SparkContext(conf)`, which takes a [SparkConf](api/core/index.html#org.apache.spark.SparkConf)
-object for more advanced configuration.
-
-The `master` parameter is a string specifying a [Spark or Mesos cluster URL](#master-urls) to connect to, or a special "local" string to run in local mode, as described below. `appName` is a name for your application, which will be shown in the cluster web UI. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later.
+The `<master>` parameter is a string specifying a [Spark, Mesos or YARN cluster URL](#master-urls)
+to connect to, or a special "local" string to run in local mode, as described below. `<app name>` is
+a name for your application, which will be shown in the cluster web UI. It's also possible to set
+these variables [using a configuration file](cluster-overview.html#loading-configurations-from-a-file)
+which avoids hard-coding the master name in your application.
 
-In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable, and you can add JARs to the classpath with the `ADD_JARS` variable. For example, to run `bin/spark-shell` on exactly four cores, use
+In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the
+variable called `sc`. Making your own SparkContext will not work. You can set which master the
+context connects to using the `MASTER` environment variable, and you can add JARs to the classpath
+with the `ADD_JARS` variable. For example, to run `bin/spark-shell` on exactly four cores, use
 
 {% highlight bash %}
 $ MASTER=local[4] ./bin/spark-shell
@@ -83,21 +90,16 @@ The master URL passed to Spark can be in one of the following formats:
         The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use,
         which is 5050 by default.
 </td></tr>
+<tr><td> yarn-client </td><td> Connect to a <a href="running-on-yarn.html"> YARN </a> cluster in
+client mode. The cluster location will be inferred based on the local Hadoop configuration.
+</td></tr>
+<tr><td> yarn-cluster </td><td> Connect to a <a href="running-on-yarn.html"> YARN </a> cluster in
+cluster mode. The cluster location will be inferred based on the local Hadoop configuration.
+</td></tr>
 </table>
 
 If no master URL is specified, the spark shell defaults to "local[*]".
 
-For running on YARN, Spark launches an instance of the standalone deploy cluster within YARN; see [running on YARN](running-on-yarn.html) for details.
-
-### Deploying Code on a Cluster
-
-If you want to run your application on a cluster, you will need to specify the two optional parameters to `SparkContext` to let it find your code:
-
-* `sparkHome`: The path at which Spark is installed on your worker machines (it should be the same on all of them).
-* `jars`: A list of JAR files on the local machine containing your application's code and any dependencies, which Spark will deploy to all the worker nodes. You'll need to package your application into a set of JARs using your build system. For example, if you're using SBT, the [sbt-assembly](https://github.com/sbt/sbt-assembly) plugin is a good way to make a single JAR with your code and dependencies.
-
-If you run `bin/spark-shell` on a cluster, you can add JARs to it by specifying the `ADD_JARS` environment variable before you launch it.  This variable should contain a comma-separated list of JARs. For example, `ADD_JARS=a.jar,b.jar ./bin/spark-shell` will launch a shell with `a.jar` and `b.jar` on its classpath. In addition, any new classes you define in the shell will automatically be distributed.
-
 # Resilient Distributed Datasets (RDDs)
 
 Spark revolves around the concept of a _resilient distributed dataset_ (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: *parallelized collections*, which take an existing Scala collection and run functions on it in parallel, and *Hadoop datasets*, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods.
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 6f616fb7c244891e79ccc169c926eddf121e98fe..8e98cc0c80a34bfa5cfaa6c3b577ea4620727071 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -69,7 +69,7 @@ import sqlContext._
 
 </div>
 
-<div data-lang="java"  markdown="1">
+<div data-lang="java" markdown="1">
 
 The entry point into all relational functionality in Spark is the
 [JavaSQLContext](api/sql/core/index.html#org.apache.spark.sql.api.java.JavaSQLContext) class, or one
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f078d06aafad0c2116eeaae3695f74d2662ae4e6..2f74965900baf8f1949f0060dd26409d348718ef 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,6 +39,9 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
+/**
+ * An application master that runs the users driver program and allocates executors.
+ */
 class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
                         sparkConf: SparkConf) extends Logging {
 
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0179b0600c61f20703a608ef1c5d75f8befc5b9a..00c7649e68e13ef001219fcaac177021d6536ad7 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -33,7 +33,9 @@ import org.apache.hadoop.yarn.util.{Apps, Records}
 
 import org.apache.spark.{Logging, SparkConf}
 
-
+/**
+ * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
+ */
 class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
   extends YarnClientImpl with ClientBase with Logging {
 
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 21f14576efe8ab0644b0f8d4fb5a9e72465f1f84..ea356f33eb99819d4ac5754a8b49cd559fc04f5b 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -34,6 +34,12 @@ import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
 
+/**
+ * An application master that allocates executors on behalf of a driver that is running outside
+ * the cluster.
+ *
+ * This is used only in yarn-client mode.
+ */
 class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
   extends Logging {
 
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index d6d46a5f6ce42b1568480ac387cc2e11f3971971..95f0f9d0ff2bcf050647037903635274d286ece6 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -54,6 +54,10 @@ object AllocationType extends Enumeration {
 // Note that right now, we assume all node asks as uniform in terms of capabilities and priority
 // Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
 // more info on how we are requesting for containers.
+
+/**
+ * Acquires resources for executors from a ResourceManager and launches executors in new containers.
+ */
 private[yarn] class YarnAllocationHandler(
     val conf: Configuration,
     val resourceManager: AMRMProtocol, 
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 566de712fc2802d0ed9335c0cd6a83994028355b..c00b63669ca8e5cfe1583e3381940a5b703d9247 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -22,16 +22,13 @@ import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
 import java.nio.ByteBuffer
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
+import scala.collection.mutable.{HashMap, ListBuffer, Map}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.mapreduce.MRJobConfig
-import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.util.StringUtils
 import org.apache.hadoop.yarn.api._
@@ -39,19 +36,18 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Records, Apps}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.Utils
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-
+import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
 
 /**
  * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
- * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
- * which will launch a Spark master process and negotiate resources throughout its duration.
+ * Client submits an application to the YARN ResourceManager.
+ *
+ * Depending on the deployment mode this will launch one of two application master classes:
+ * 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
+ *      which launches a driver program inside of the cluster.
+ * 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
+ *      request executors on behalf of a driver running outside of the cluster.
  */
 trait ClientBase extends Logging {
   val args: ClientArguments
@@ -70,7 +66,6 @@ trait ClientBase extends Logging {
   // TODO(harvey): This could just go in ClientArguments.
   def validateArgs() = {
     Map(
-      (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
       ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
           "Error: You must specify a user jar when running in standalone mode!"),
       (args.userClass == null) -> "Error: You must specify a user class!",
@@ -208,7 +203,7 @@ trait ClientBase extends Logging {
     val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
 
     Map(
-      ClientBase.SPARK_JAR -> System.getenv("SPARK_JAR"), ClientBase.APP_JAR -> args.userJar,
+      ClientBase.SPARK_JAR -> ClientBase.getSparkJar, ClientBase.APP_JAR -> args.userJar,
       ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
     ).foreach { case(destName, _localPath) =>
       val localPath: String = if (_localPath != null) _localPath.trim() else ""
@@ -251,8 +246,10 @@ trait ClientBase extends Logging {
     logInfo("Setting up the launch environment")
 
     val env = new HashMap[String, String]()
+
+    val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
     val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
-    ClientBase.populateClasspath(args, yarnConf, sparkConf, log4jConf, env)
+    ClientBase.populateClasspath(yarnConf, sparkConf, log4jConf, env, extraCp)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
     env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -268,9 +265,6 @@ trait ClientBase extends Logging {
     YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
       File.pathSeparator)
 
-    // Add each SPARK_* key to the environment.
-    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
-
     env
   }
 
@@ -299,13 +293,13 @@ trait ClientBase extends Logging {
 
     val amMemory = calculateAMMemory(newApp)
 
-    var JAVA_OPTS = ""
+    val JAVA_OPTS = ListBuffer[String]()
 
     // Add Xmx for AM memory
     JAVA_OPTS += "-Xmx" + amMemory + "m"
 
     val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
-    JAVA_OPTS += " -Djava.io.tmpdir=" + tmpDir
+    JAVA_OPTS += "-Djava.io.tmpdir=" + tmpDir
 
     // TODO: Remove once cpuset version is pushed out.
     // The context is, default gc for server class machines ends up using all cores to do gc -
@@ -319,35 +313,48 @@ trait ClientBase extends Logging {
     if (useConcurrentAndIncrementalGC) {
       // In our expts, using (default) throughput collector has severe perf ramifications in
       // multi-tenant machines
-      JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
-      JAVA_OPTS += " -XX:+CMSIncrementalMode "
-      JAVA_OPTS += " -XX:+CMSIncrementalPacing "
-      JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
-      JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+      JAVA_OPTS += "-XX:+UseConcMarkSweepGC"
+      JAVA_OPTS += "-XX:+CMSIncrementalMode"
+      JAVA_OPTS += "-XX:+CMSIncrementalPacing"
+      JAVA_OPTS += "-XX:CMSIncrementalDutyCycleMin=0"
+      JAVA_OPTS += "-XX:CMSIncrementalDutyCycle=10"
     }
 
-    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
-      JAVA_OPTS += " " + env("SPARK_JAVA_OPTS")
+    // TODO: it might be nicer to pass these as an internal environment variable rather than
+    // as Java options, due to complications with string parsing of nested quotes.
+    if (args.amClass == classOf[ExecutorLauncher].getName) {
+      // If we are being launched in client mode, forward the spark-conf options
+      // onto the executor launcher
+      for ((k, v) <- sparkConf.getAll) {
+        JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+      }
+    } else {
+      // If we are being launched in standalone mode, capture and forward any spark
+      // system properties (e.g. set by spark-class).
+      for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
+        JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+      }
+      sys.props.get("spark.driver.extraJavaOptions").foreach(opts => JAVA_OPTS += opts)
+      sys.props.get("spark.driver.libraryPath").foreach(p => JAVA_OPTS += s"-Djava.library.path=$p")
     }
     JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
 
     // Command for the ApplicationMaster
-    val commands = List[String](
-      Environment.JAVA_HOME.$() + "/bin/java" +
-        " -server " +
-        JAVA_OPTS +
-        " " + args.amClass +
-        " --class " + args.userClass +
-        " --jar " + args.userJar +
-        userArgsToString(args) +
-        " --executor-memory " + args.executorMemory +
-        " --executor-cores " + args.executorCores +
-        " --num-executors " + args.numExecutors +
-        " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-        " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
-    logInfo("Command for starting the Spark ApplicationMaster: " + commands(0))
-    amContainer.setCommands(commands)
+    val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
+      JAVA_OPTS ++
+      Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar,
+        userArgsToString(args),
+        "--executor-memory", args.executorMemory.toString,
+        "--executor-cores", args.executorCores.toString,
+        "--num-executors ", args.numExecutors.toString,
+        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+    logInfo("Command for starting the Spark ApplicationMaster: " + commands)
+
+    // TODO: it would be nicer to just make sure there are no null commands here
+    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
+    amContainer.setCommands(printableCommands)
 
     setupSecurityToken(amContainer)
     amContainer
@@ -361,6 +368,8 @@ object ClientBase {
   val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
   val LOCAL_SCHEME = "local"
 
+  def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
+
   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
   def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
     val classpathEntries = Option(conf.getStrings(
@@ -433,10 +442,9 @@ object ClientBase {
     " -Dlog4j.configuration=" + log4jConf
   }
 
-  def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
-      log4jConf: String, env: HashMap[String, String]) {
-    YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(),
-      File.pathSeparator)
+  def populateClasspath(conf: Configuration, sparkConf: SparkConf, log4jConf: String,
+      env: HashMap[String, String], extraClassPath: Option[String] = None) {
+
     if (log4jConf != null) {
       // If a custom log4j config file is provided as a local: URI, add its parent directory to the
       // classpath. Note that this only works if the custom config's file name is
@@ -448,19 +456,26 @@ object ClientBase {
           File.pathSeparator)
       }
     }
+
+    /** Add entry to the classpath. */
+    def addClasspathEntry(path: String) = Apps.addToEnvironment(env, Environment.CLASSPATH.name, path)
+    /** Add entry to the classpath. Interpreted as a path relative to the working directory. */
+    def addPwdClasspathEntry(entry: String) = addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + entry)
+
+    extraClassPath.foreach(addClasspathEntry)
+
+    addClasspathEntry(Environment.PWD.$())
     // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
-      .toBoolean
-    if (userClasspathFirst) {
-      addUserClasspath(args, env)
-    }
-    addClasspathEntry(System.getenv("SPARK_JAR"), SPARK_JAR, env);
-    ClientBase.populateHadoopClasspath(conf, env)
-    if (!userClasspathFirst) {
-      addUserClasspath(args, env)
+    if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
+      addPwdClasspathEntry(APP_JAR)
+      addPwdClasspathEntry(SPARK_JAR)
+      ClientBase.populateHadoopClasspath(conf, env)
+    } else {
+      addPwdClasspathEntry(SPARK_JAR)
+      ClientBase.populateHadoopClasspath(conf, env)
+      addPwdClasspathEntry(APP_JAR)
     }
-    YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
-      Environment.PWD.$() + Path.SEPARATOR + "*", File.pathSeparator)
+    addPwdClasspathEntry("*")
   }
 
   /**
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 40b38661f794dc62885dcc5e3b1020b10729437d..7d07f6f68046a7eb7c436f13c27f41362cb331af 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -19,26 +19,18 @@ package org.apache.spark.deploy.yarn
 
 import java.io.File
 import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
-
-import org.apache.spark.{SparkConf, Logging}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
 
+import org.apache.spark.{Logging, SparkConf}
 
 trait ExecutorRunnableUtil extends Logging {
 
@@ -58,8 +50,10 @@ trait ExecutorRunnableUtil extends Logging {
     // Set the JVM memory
     val executorMemoryString = executorMemory + "m"
     JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
-    if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
-      JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+
+    // Set extra Java options for the executor, if defined
+    sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
+      JAVA_OPTS += opts
     }
 
     JAVA_OPTS += " -Djava.io.tmpdir=" +
@@ -162,8 +156,9 @@ trait ExecutorRunnableUtil extends Logging {
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
 
+    val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
     val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY)
-    ClientBase.populateClasspath(null, yarnConf, sparkConf, log4jConf, env)
+    ClientBase.populateClasspath(yarnConf, sparkConf, log4jConf, env, extraCp)
     if (log4jConf != null) {
       env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf
     }
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 161918859e7c47a4ff300dc0ba235862a2113adb..ce2dde0631ed9de15d16bf75faf0617afa6fb4be 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
 
 import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
 import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
 import scala.collection.mutable.ArrayBuffer
@@ -54,7 +54,7 @@ private[spark] class YarnClientSchedulerBackend(
       "--class", "notused",
       "--jar", null,
       "--args", hostport,
-      "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
+      "--am-class", classOf[ExecutorLauncher].getName
     )
 
     // process any optional arguments, given either as environment variables
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index b225be6a79c0e767378fb5b42c5d6d8da2044fc4..90e807160d4b6508df7eb741b56952066022f684 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -42,6 +42,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
 
+/**
+ * An application master that runs the user's driver program and allocates executors.
+ */
 class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
                         sparkConf: SparkConf) extends Logging {
 
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 77eb1276a0c4e5792dc39790cfb273f95a9c26ab..2e2fb5d4fa787a3be946821de4e9f1a1b46ddbf5 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -34,9 +34,7 @@ import org.apache.spark.{Logging, SparkConf}
 
 
 /**
- * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
- * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
- * which will launch a Spark master process and negotiate resources throughout its duration.
+ * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
  */
 class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
   extends YarnClientImpl with ClientBase with Logging {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 67ed591c78bf9c3ae31b8ab52d81417f612e15e4..a14bb377aa133fede811332903d2b2917a4fc5a0 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -35,6 +35,12 @@ import org.apache.spark.scheduler.SplitInfo
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 
+/**
+ * An application master that allocates executors on behalf of a driver that is running outside
+ * the cluster.
+ *
+ * This is used only in yarn-client mode.
+ */
 class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
   extends Logging {
 
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 4fafae1aff26ffff44cd8272279854591f134c71..a979fe4d62630e401258b171847729435a186cb5 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -56,6 +56,10 @@ object AllocationType extends Enumeration {
 // Note that right now, we assume all node asks as uniform in terms of capabilities and priority
 // Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
 // more info on how we are requesting for containers.
+
+/**
+ * Acquires resources for executors from a ResourceManager and launches executors in new containers.
+ */
 private[yarn] class YarnAllocationHandler(
     val conf: Configuration,
     val amClient: AMRMClient[ContainerRequest],