From cd739bd756875bd52e9bd8ae801e0ae10a1f6937 Mon Sep 17 00:00:00 2001
From: GuoQiang Li <witgo@qq.com>
Date: Wed, 29 Oct 2014 23:02:58 -0700
Subject: [PATCH] [SPARK-1720][SPARK-1719] use LD_LIBRARY_PATH instead of
 -Djava.library.path

- [X] Standalone
- [X] YARN
- [X] Mesos
- [X]  Mac OS X
- [X] Linux
- [ ]  Windows

This is another implementation about #1031

Author: GuoQiang Li <witgo@qq.com>

Closes #2711 from witgo/SPARK-1719 and squashes the following commits:

c7b26f6 [GuoQiang Li] review commits
4488e41 [GuoQiang Li] Refactoring CommandUtils
a444094 [GuoQiang Li] review commits
40c0b4a [GuoQiang Li] Add buildLocalCommand method
c1a0ddd [GuoQiang Li] fix comments
156ce88 [GuoQiang Li] review commit
38aa377 [GuoQiang Li] Refactor CommandUtils.scala
4269e00 [GuoQiang Li] Refactor SparkSubmitDriverBootstrapper.scala
7a1d634 [GuoQiang Li] use LD_LIBRARY_PATH instead of -Djava.library.path
---
 bin/spark-class                               |  6 +-
 .../scala/org/apache/spark/SparkConf.scala    | 13 ++++
 .../SparkSubmitDriverBootstrapper.scala       | 17 ++---
 .../spark/deploy/worker/CommandUtils.scala    | 68 ++++++++++++++++---
 .../spark/deploy/worker/DriverRunner.scala    | 23 ++-----
 .../spark/deploy/worker/ExecutorRunner.scala  | 26 +++----
 .../mesos/CoarseMesosSchedulerBackend.scala   | 22 +++---
 .../cluster/mesos/MesosSchedulerBackend.scala | 18 ++---
 .../scala/org/apache/spark/util/Utils.scala   | 42 +++++++++++-
 .../spark/deploy/CommandUtilsSuite.scala      | 37 ++++++++++
 .../deploy/worker/ExecutorRunnerTest.scala    |  5 +-
 .../apache/spark/deploy/yarn/ClientBase.scala | 14 +++-
 .../deploy/yarn/ExecutorRunnableUtil.scala    | 11 ++-
 13 files changed, 221 insertions(+), 81 deletions(-)
 create mode 100644 core/src/test/scala/org/apache/spark/deploy/CommandUtilsSuite.scala

diff --git a/bin/spark-class b/bin/spark-class
index 91d858bc06..925367b0dd 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -81,7 +81,11 @@ case "$1" in
     OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS"
     OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
     if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then
-      OUR_JAVA_OPTS="$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
+      if [[ $OSTYPE == darwin* ]]; then
+       export DYLD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$DYLD_LIBRARY_PATH"
+      else
+       export LD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$LD_LIBRARY_PATH"
+      fi
     fi
     if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then
       OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY"
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index dbbcc23305..ad0a9017af 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -244,6 +244,19 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     val executorClasspathKey = "spark.executor.extraClassPath"
     val driverOptsKey = "spark.driver.extraJavaOptions"
     val driverClassPathKey = "spark.driver.extraClassPath"
+    val driverLibraryPathKey = "spark.driver.extraLibraryPath"
+
+    // Used by Yarn in 1.1 and before
+    sys.props.get("spark.driver.libraryPath").foreach { value =>
+      val warning =
+        s"""
+          |spark.driver.libraryPath was detected (set to '$value').
+          |This is deprecated in Spark 1.2+.
+          |
+          |Please instead use: $driverLibraryPathKey
+        """.stripMargin
+      logWarning(warning)
+    }
 
     // Validate spark.executor.extraJavaOptions
     settings.get(executorOptsKey).map { javaOpts =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index 0125330589..2b894a796c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -82,17 +82,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
       .orElse(confDriverMemory)
       .getOrElse(defaultDriverMemory)
 
-    val newLibraryPath =
-      if (submitLibraryPath.isDefined) {
-        // SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS
-        ""
-      } else {
-        confLibraryPath.map("-Djava.library.path=" + _).getOrElse("")
-      }
-
     val newClasspath =
       if (submitClasspath.isDefined) {
-        // SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH
         classpath
       } else {
         classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
@@ -114,7 +105,6 @@ private[spark] object SparkSubmitDriverBootstrapper {
     val command: Seq[String] =
       Seq(runner) ++
       Seq("-cp", newClasspath) ++
-      Seq(newLibraryPath) ++
       filteredJavaOpts ++
       Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
       Seq("org.apache.spark.deploy.SparkSubmit") ++
@@ -130,6 +120,13 @@ private[spark] object SparkSubmitDriverBootstrapper {
     // Start the driver JVM
     val filteredCommand = command.filter(_.nonEmpty)
     val builder = new ProcessBuilder(filteredCommand)
+    val env = builder.environment()
+
+    if (submitLibraryPath.isEmpty && confLibraryPath.nonEmpty) {
+      val libraryPaths = confLibraryPath ++ sys.env.get(Utils.libraryPathEnvName)
+      env.put(Utils.libraryPathEnvName, libraryPaths.mkString(sys.props("path.separator")))
+    }
+
     val process = builder.start()
 
     // Redirect stdout and stderr from the child JVM
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 2e9be2a180..aba2e20118 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -20,6 +20,8 @@ package org.apache.spark.deploy.worker
 import java.io.{File, FileOutputStream, InputStream, IOException}
 import java.lang.System._
 
+import scala.collection.Map
+
 import org.apache.spark.Logging
 import org.apache.spark.deploy.Command
 import org.apache.spark.util.Utils
@@ -29,7 +31,29 @@ import org.apache.spark.util.Utils
  */
 private[spark]
 object CommandUtils extends Logging {
-  def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
+
+  /**
+   * Build a ProcessBuilder based on the given parameters.
+   * The `env` argument is exposed for testing.
+   */
+  def buildProcessBuilder(
+    command: Command,
+    memory: Int,
+    sparkHome: String,
+    substituteArguments: String => String,
+    classPaths: Seq[String] = Seq[String](),
+    env: Map[String, String] = sys.env): ProcessBuilder = {
+    val localCommand = buildLocalCommand(command, substituteArguments, classPaths, env)
+    val commandSeq = buildCommandSeq(localCommand, memory, sparkHome)
+    val builder = new ProcessBuilder(commandSeq: _*)
+    val environment = builder.environment()
+    for ((key, value) <- localCommand.environment) {
+      environment.put(key, value)
+    }
+    builder
+  }
+
+  private def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
     val runner = sys.env.get("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
 
     // SPARK-698: do not call the run.cmd script, as process.destroy()
@@ -38,11 +62,41 @@ object CommandUtils extends Logging {
       command.arguments
   }
 
+  /**
+   * Build a command based on the given one, taking into account the local environment
+   * of where this command is expected to run, substitute any placeholders, and append
+   * any extra class paths.
+   */
+  private def buildLocalCommand(
+      command: Command,
+      substituteArguments: String => String,
+      classPath: Seq[String] = Seq[String](),
+      env: Map[String, String]): Command = {
+    val libraryPathName = Utils.libraryPathEnvName
+    val libraryPathEntries = command.libraryPathEntries
+    val cmdLibraryPath = command.environment.get(libraryPathName)
+
+    val newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
+      val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName)
+      command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator)))
+    } else {
+      command.environment
+    }
+
+    Command(
+      command.mainClass,
+      command.arguments.map(substituteArguments),
+      newEnvironment,
+      command.classPathEntries ++ classPath,
+      Seq[String](), // library path already captured in environment variable
+      command.javaOpts)
+  }
+
   /**
    * Attention: this must always be aligned with the environment variables in the run scripts and
    * the way the JAVA_OPTS are assembled there.
    */
-  def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
+  private def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
     val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
 
     // Exists for backwards compatibility with older Spark versions
@@ -53,14 +107,6 @@ object CommandUtils extends Logging {
       logWarning("Set SPARK_LOCAL_DIRS for node-specific storage locations.")
     }
 
-    val libraryOpts =
-      if (command.libraryPathEntries.size > 0) {
-        val joined = command.libraryPathEntries.mkString(File.pathSeparator)
-        Seq(s"-Djava.library.path=$joined")
-      } else {
-        Seq()
-      }
-
     // Figure out our classpath with the external compute-classpath script
     val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
     val classPath = Utils.executeAndGetOutput(
@@ -71,7 +117,7 @@ object CommandUtils extends Logging {
     val javaVersion = System.getProperty("java.version")
     val permGenOpt = if (!javaVersion.startsWith("1.8")) Some("-XX:MaxPermSize=128m") else None
     Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
-      permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
+      permGenOpt ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
   }
 
   /** Spawn a thread that will redirect a given stream to a file */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 3bf0b9492d..28cab36c7b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -76,17 +76,9 @@ private[spark] class DriverRunner(
 
           // Make sure user application jar is on the classpath
           // TODO: If we add ability to submit multiple jars they should also be added here
-          val classPath = driverDesc.command.classPathEntries ++ Seq(s"$localJarFilename")
-          val newCommand = Command(
-            driverDesc.command.mainClass,
-            driverDesc.command.arguments.map(substituteVariables),
-            driverDesc.command.environment,
-            classPath,
-            driverDesc.command.libraryPathEntries,
-            driverDesc.command.javaOpts)
-          val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
-            sparkHome.getAbsolutePath)
-          launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
+          val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
+            sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename))
+          launchDriver(builder, driverDir, driverDesc.supervise)
         }
         catch {
           case e: Exception => finalException = Some(e)
@@ -165,11 +157,8 @@ private[spark] class DriverRunner(
     localJarFilename
   }
 
-  private def launchDriver(command: Seq[String], envVars: Map[String, String], baseDir: File,
-                           supervise: Boolean) {
-    val builder = new ProcessBuilder(command: _*).directory(baseDir)
-    envVars.map{ case(k,v) => builder.environment().put(k, v) }
-
+  private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) {
+    builder.directory(baseDir)
     def initialize(process: Process) = {
       // Redirect stdout and stderr to files
       val stdout = new File(baseDir, "stdout")
@@ -177,7 +166,7 @@ private[spark] class DriverRunner(
 
       val stderr = new File(baseDir, "stderr")
       val header = "Launch Command: %s\n%s\n\n".format(
-        command.mkString("\"", "\" \"", "\""), "=" * 40)
+        builder.command.mkString("\"", "\" \"", "\""), "=" * 40)
       Files.append(header, stderr, UTF_8)
       CommandUtils.redirectStream(process.getErrorStream, stderr)
     }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 030a651469..8ba6a01bbc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.worker
 
 import java.io._
 
+import scala.collection.JavaConversions._
+
 import akka.actor.ActorRef
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
@@ -115,33 +117,21 @@ private[spark] class ExecutorRunner(
     case other => other
   }
 
-  def getCommandSeq = {
-    val command = Command(
-      appDesc.command.mainClass,
-      appDesc.command.arguments.map(substituteVariables),
-      appDesc.command.environment,
-      appDesc.command.classPathEntries,
-      appDesc.command.libraryPathEntries,
-      appDesc.command.javaOpts)
-    CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
-  }
-
   /**
    * Download and run the executor described in our ApplicationDescription
    */
   def fetchAndRunExecutor() {
     try {
       // Launch the process
-      val command = getCommandSeq
+      val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
+        sparkHome.getAbsolutePath, substituteVariables)
+      val command = builder.command()
       logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
-      val builder = new ProcessBuilder(command: _*).directory(executorDir)
-      val env = builder.environment()
-      for ((key, value) <- appDesc.command.environment) {
-        env.put(key, value)
-      }
+
+      builder.directory(executorDir)
       // In case we are running this from within the Spark Shell, avoid creating a "scala"
       // parent process for the executor command
-      env.put("SPARK_LAUNCH_WITH_SCALA", "0")
+      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
       process = builder.start()
       val header = "Spark Executor Command: %s\n%s\n\n".format(
         command.mkString("\"", "\" \"", "\""), "=" * 40)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index d7f88de4b4..d8c0e2f66d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -31,6 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas
 import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -120,16 +121,18 @@ private[spark] class CoarseMesosSchedulerBackend(
       environment.addVariables(
         Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
     }
-    val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")
+    val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "")
 
-    val libraryPathOption = "spark.executor.extraLibraryPath"
-    val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p")
-    val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
+    // Set the environment variable through a command prefix
+    // to append to the existing value of the variable
+    val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p =>
+      Utils.libraryPathEnvPrefix(Seq(p))
+    }.getOrElse("")
 
     environment.addVariables(
       Environment.Variable.newBuilder()
         .setName("SPARK_EXECUTOR_OPTS")
-        .setValue(extraOpts)
+        .setValue(extraJavaOpts)
         .build())
 
     sc.executorEnvs.foreach { case (key, value) =>
@@ -150,16 +153,17 @@ private[spark] class CoarseMesosSchedulerBackend(
     if (uri == null) {
       val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
       command.setValue(
-        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
-          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId))
+        "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
+          prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue,
+          offer.getHostname, numCores, appId))
     } else {
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
       val basename = uri.split('/').last.split('.').head
       command.setValue(
-        ("cd %s*; " +
+        ("cd %s*; %s " +
           "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
-          .format(basename, driverUrl, offer.getSlaveId.getValue,
+          .format(basename, prefixEnv, driverUrl, offer.getSlaveId.getValue,
             offer.getHostname, numCores, appId))
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index e0f2fd622f..8e2faff90f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -98,15 +98,16 @@ private[spark] class MesosSchedulerBackend(
       environment.addVariables(
         Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
     }
-    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
-    val extraLibraryPath = sc.conf.getOption("spark.executor.extraLibraryPath").map { lp =>
-      s"-Djava.library.path=$lp"
-    }
-    val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
+    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("")
+
+    val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p =>
+      Utils.libraryPathEnvPrefix(Seq(p))
+    }.getOrElse("")
+
     environment.addVariables(
       Environment.Variable.newBuilder()
         .setName("SPARK_EXECUTOR_OPTS")
-        .setValue(extraOpts)
+        .setValue(extraJavaOpts)
         .build())
     sc.executorEnvs.foreach { case (key, value) =>
       environment.addVariables(Environment.Variable.newBuilder()
@@ -118,12 +119,13 @@ private[spark] class MesosSchedulerBackend(
       .setEnvironment(environment)
     val uri = sc.conf.get("spark.executor.uri", null)
     if (uri == null) {
-      command.setValue(new File(executorSparkHome, "/sbin/spark-executor").getCanonicalPath)
+      val executorPath = new File(executorSparkHome, "/sbin/spark-executor").getCanonicalPath
+      command.setValue("%s %s".format(prefixEnv, executorPath))
     } else {
       // Grab everything to the first '.'. We'll use that and '*' to
       // glob the directory "correctly".
       val basename = uri.split('/').last.split('.').head
-      command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
+      command.setValue("cd %s*; %s ./sbin/spark-executor".format(basename, prefixEnv))
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }
     val cpus = Resource.newBuilder()
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0daab91143..063895d3c5 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer
 import java.util.{Properties, Locale, Random, UUID}
 import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
 
-import org.eclipse.jetty.util.MultiException
-
 import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
@@ -39,6 +37,7 @@ import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.log4j.PropertyConfigurator
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+import org.eclipse.jetty.util.MultiException
 import org.json4s._
 import tachyon.client.{TachyonFile,TachyonFS}
 
@@ -1381,6 +1380,11 @@ private[spark] object Utils extends Logging {
    */
   val isWindows = SystemUtils.IS_OS_WINDOWS
 
+  /**
+   * Whether the underlying operating system is Mac OS X.
+   */
+  val isMac = SystemUtils.IS_OS_MAC_OSX
+
   /**
    * Pattern for matching a Windows drive, which contains only a single alphabet character.
    */
@@ -1714,6 +1718,40 @@ private[spark] object Utils extends Logging {
     method.invoke(obj, values.toSeq: _*)
   }
 
+  /**
+   * Return the current system LD_LIBRARY_PATH name
+   */
+  def libraryPathEnvName: String = {
+    if (isWindows) {
+      "PATH"
+    } else if (isMac) {
+      "DYLD_LIBRARY_PATH"
+    } else {
+      "LD_LIBRARY_PATH"
+    }
+  }
+
+  /**
+   * Return the prefix of a command that appends the given library paths to the
+   * system-specific library path environment variable. On Unix, for instance,
+   * this returns the string LD_LIBRARY_PATH="path1:path2:$LD_LIBRARY_PATH".
+   */
+  def libraryPathEnvPrefix(libraryPaths: Seq[String]): String = {
+    val libraryPathScriptVar = if (isWindows) {
+      s"%${libraryPathEnvName}%"
+    } else {
+      "$" + libraryPathEnvName
+    }
+    val libraryPath = (libraryPaths :+ libraryPathScriptVar).mkString("\"",
+      File.pathSeparator, "\"")
+    val ampersand = if (Utils.isWindows) {
+      " &"
+    } else {
+      ""
+    }
+    s"$libraryPathEnvName=$libraryPath$ampersand"
+  }
+
 }
 
 /**
diff --git a/core/src/test/scala/org/apache/spark/deploy/CommandUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/CommandUtilsSuite.scala
new file mode 100644
index 0000000000..7915ee75d8
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/CommandUtilsSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.apache.spark.deploy.worker.CommandUtils
+import org.apache.spark.util.Utils
+
+import org.scalatest.{FunSuite, Matchers}
+
+class CommandUtilsSuite extends FunSuite with Matchers {
+
+  test("set libraryPath correctly") {
+    val appId = "12345-worker321-9876"
+    val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
+    val cmd = new Command("mainClass", Seq(), Map(), Seq(), Seq("libraryPathToB"), Seq())
+    val builder = CommandUtils.buildProcessBuilder(cmd, 512, sparkHome, t => t)
+    val libraryPath = Utils.libraryPathEnvName
+    val env = builder.environment
+    env.keySet should contain(libraryPath)
+    assert(env.get(libraryPath).startsWith("libraryPathToB"))
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 5e2592e8d2..1962170629 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.worker
 
 import java.io.File
 
+import scala.collection.JavaConversions._
+
 import org.scalatest.FunSuite
 
 import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
@@ -32,6 +34,7 @@ class ExecutorRunnerTest extends FunSuite {
       Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
     val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
       new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
-    assert(er.getCommandSeq.last === appId)
+    val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
+    assert(builder.command().last === appId)
   }
 }
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 8ea0e7cf40..f95d723791 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.util.Utils
 
 /**
  * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN.
@@ -312,6 +313,10 @@ private[spark] trait ClientBase extends Logging {
 
     val javaOpts = ListBuffer[String]()
 
+    // Set the environment variable through a command prefix
+    // to append to the existing value of the variable
+    var prefixEnv: Option[String] = None
+
     // Add Xmx for AM memory
     javaOpts += "-Xmx" + args.amMemory + "m"
 
@@ -348,8 +353,11 @@ private[spark] trait ClientBase extends Logging {
       sparkConf.getOption("spark.driver.extraJavaOptions")
         .orElse(sys.env.get("SPARK_JAVA_OPTS"))
         .foreach(opts => javaOpts += opts)
-      sparkConf.getOption("spark.driver.libraryPath")
-        .foreach(p => javaOpts += s"-Djava.library.path=$p")
+      val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
+        sys.props.get("spark.driver.libraryPath")).flatten
+      if (libraryPaths.nonEmpty) {
+        prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
+      }
     }
 
     // For log4j configuration to reference
@@ -384,7 +392,7 @@ private[spark] trait ClientBase extends Logging {
         "--num-executors ", args.numExecutors.toString)
 
     // Command for the ApplicationMaster
-    val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
+    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
       javaOpts ++ amArgs ++
       Seq(
         "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 5cb4753de2..88dad0febd 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
 import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.Utils
 
 trait ExecutorRunnableUtil extends Logging {
 
@@ -47,6 +48,11 @@ trait ExecutorRunnableUtil extends Logging {
       localResources: HashMap[String, LocalResource]): List[String] = {
     // Extra options for the JVM
     val javaOpts = ListBuffer[String]()
+
+    // Set the environment variable through a command prefix
+    // to append to the existing value of the variable
+    var prefixEnv: Option[String] = None
+
     // Set the JVM memory
     val executorMemoryString = executorMemory + "m"
     javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
@@ -58,6 +64,9 @@ trait ExecutorRunnableUtil extends Logging {
     sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
       javaOpts += opts
     }
+    sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
+      prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
+    }
 
     javaOpts += "-Djava.io.tmpdir=" +
       new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
@@ -101,7 +110,7 @@ trait ExecutorRunnableUtil extends Logging {
     // For log4j configuration to reference
     javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
 
-    val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java",
+    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java",
       "-server",
       // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
       // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
-- 
GitLab