diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index f2be8217a2f8ab00ac2d1082685a055a8391da1a..27a518ccda459dee928520fb98420dfe1e28a696 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -263,9 +263,13 @@ trait ClientBase extends Logging {
     distCacheMgr.setDistFilesEnv(env)
     distCacheMgr.setDistArchivesEnv(env)
 
-    // Allow users to specify some environment variables.
-    YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
-      File.pathSeparator)
+    sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
+      // Allow users to specify some environment variables.
+      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs, File.pathSeparator)
+
+      // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments.
+      env("SPARK_YARN_USER_ENV") = userEnvs
+    }
 
     env
   }
@@ -322,6 +326,12 @@ trait ClientBase extends Logging {
       JAVA_OPTS += "-XX:CMSIncrementalDutyCycle=10"
     }
 
+    // SPARK_JAVA_OPTS is deprecated, but for backwards compatibility:
+    sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
+      sparkConf.set("spark.executor.extraJavaOptions", opts)
+      sparkConf.set("spark.driver.extraJavaOptions", opts)
+    }
+
     // TODO: it might be nicer to pass these as an internal environment variable rather than
     // as Java options, due to complications with string parsing of nested quotes.
     if (args.amClass == classOf[ExecutorLauncher].getName) {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 7d07f6f68046a7eb7c436f13c27f41362cb331af..96f8aa93394f5b202f1a260ec4c9bd030171d7d6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -71,8 +71,8 @@ trait ExecutorRunnableUtil extends Logging {
     /*
         else {
           // If no java_opts specified, default to using -XX:+CMSIncrementalMode
-          // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
-          // want to mess with it.
+          // It might be possible that other modes/config is being done in spark.executor.extraJavaOptions,
+          // so we dont want to mess with it.
           // In our expts, using (default) throughput collector has severe perf ramnifications in
           // multi-tennent machines
           // The options are based on