diff --git a/docs/configuration.md b/docs/configuration.md
index 25adea210cba08f969baedb511981cf504b7f1f1..5e7556c08ee3659d2db24af3dea8f3239e998e55 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -206,6 +206,14 @@ Apart from these, the following properties are also available, and may be useful
     used during aggregation goes above this amount, it will spill the data into disks.
   </td>
 </tr>
+<tr>
+  <td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
+  <td>(none)</td>
+  <td>
+    Add the environment variable specified by <code>EnvironmentVariableName</code> to the Executor 
+    process. The user can specify multiple of these and to set multiple environment variables. 
+  </td>
+</tr>
 </table>
 
 #### Shuffle Behavior
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 573930dbf4e54781ea07a1ad746d0de5fed0d25c..9bc20dbf926b276bdaeae2111a416984933480d1 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -17,10 +17,6 @@ To build Spark yourself, refer to the [building with Maven guide](building-with-
 
 Most of the configs are the same for Spark on YARN as for other deployment modes. See the [configuration page](configuration.html) for more information on those.  These are configs that are specific to Spark on YARN.
 
-#### Environment Variables
-
-* `SPARK_YARN_USER_ENV`, to add environment variables to the Spark processes launched on YARN. This can be a comma separated list of environment variables, e.g. `SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"`.
-
 #### Spark Properties
 
 <table class="table">
@@ -110,7 +106,23 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
   <td><code>spark.yarn.access.namenodes</code></td>
   <td>(none)</td>
   <td>
-    A list of secure HDFS namenodes your Spark application is going to access. For example, `spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032`. The Spark application must have acess to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters.
+    A list of secure HDFS namenodes your Spark application is going to access. For 
+    example, `spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032`. 
+    The Spark application must have acess to the namenodes listed and Kerberos must 
+    be properly configured to be able to access them (either in the same realm or in 
+    a trusted realm). Spark acquires security tokens for each of the namenodes so that 
+    the Spark application can access those remote HDFS clusters.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.yarn.appMasterEnv.[EnvironmentVariableName]</code></td>
+  <td>(none)</td>
+  <td>
+     Add the environment variable specified by <code>EnvironmentVariableName</code> to the 
+     Application Master process launched on YARN. The user can specify multiple of 
+     these and to set multiple environment variables. In yarn-cluster mode this controls 
+     the environment of the SPARK driver and in yarn-client mode it only controls 
+     the environment of the executor launcher. 
   </td>
 </tr>
 </table>
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 44e025b8f60bafe5c01d2768d7eaff3e85d4e7a9..1da0a1b6755543e718291b249db365aef2523ab5 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -259,6 +259,14 @@ trait ClientBase extends Logging {
     localResources
   }
 
+  /** Get all application master environment variables set on this SparkConf */
+  def getAppMasterEnv: Seq[(String, String)] = {
+    val prefix = "spark.yarn.appMasterEnv."
+    sparkConf.getAll.filter{case (k, v) => k.startsWith(prefix)}
+      .map{case (k, v) => (k.substring(prefix.length), v)}
+  }
+
+
   def setupLaunchEnv(
       localResources: HashMap[String, LocalResource],
       stagingDir: String): HashMap[String, String] = {
@@ -276,6 +284,11 @@ trait ClientBase extends Logging {
     distCacheMgr.setDistFilesEnv(env)
     distCacheMgr.setDistArchivesEnv(env)
 
+    getAppMasterEnv.foreach { case (key, value) =>
+      YarnSparkHadoopUtil.addToEnvironment(env, key, value, File.pathSeparator)
+    }
+
+    // Keep this for backwards compatibility but users should move to the config
     sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
       // Allow users to specify some environment variables.
       YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs, File.pathSeparator)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 4ba7133a959ed62869ec93cb691cbedb083b228d..71a9e42846b2b08210d1fbd6e376a71c99308abb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -171,7 +171,11 @@ trait ExecutorRunnableUtil extends Logging {
     val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
     ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
 
-    // Allow users to specify some environment variables
+    sparkConf.getExecutorEnv.foreach { case (key, value) =>
+      YarnSparkHadoopUtil.addToEnvironment(env, key, value, File.pathSeparator)
+    }
+
+    // Keep this for backwards compatibility but users should move to the config
     YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
       File.pathSeparator)