Skip to content
Snippets Groups Projects
Commit 84979499 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #273 from dennybritz/executorVars

Let the user specify environment variables to be passed to the Executors
parents 5b7ee173 0700d192
No related branches found
No related tags found
No related merge requests found
......@@ -92,6 +92,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
private[spark] val addedFiles = HashMap[String, Long]()
private[spark] val addedJars = HashMap[String, Long]()
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH",
"SPARK_JAVA_OPTS", "SPARK_TESTING").foreach { key => executorEnvs.put(key, System.getenv(key)) }
// Add each JAR given through the constructor
jars.foreach { addJar(_) }
......@@ -433,6 +439,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
addedJars.clear()
}
/* Sets an environment variable that will be passed to the executors */
def putExecutorEnv(key: String, value: String) {
logInfo("Setting executor environment variable " + key + "=" + value)
executorEnvs.put(key,value)
}
/** Shut down the SparkContext. */
def stop() {
dagScheduler.stop()
......
......@@ -116,10 +116,12 @@ private[spark] class ExecutorRunner(
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- jobDesc.command.environment) {
env.put(key, value)
if (value == null) {
logInfo("Environment variable not set: " + key)
} else {
env.put(key, value)
}
}
env.put("SPARK_CORES", cores.toString)
env.put("SPARK_MEMORY", memory.toString)
// In case we are running this from within the Spark Shell
// so we are not creating a parent process.
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
......
......@@ -20,15 +20,6 @@ private[spark] class SparkDeploySchedulerBackend(
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
// Environment variables to pass to our executors
val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
"SPARK_MEM",
"SPARK_CLASSPATH",
"SPARK_LIBRARY_PATH",
"SPARK_JAVA_OPTS",
"SPARK_TESTING"
)
// Memory used by each executor (in megabytes)
val executorMemory = {
if (System.getenv("SPARK_MEM") != null) {
......@@ -42,17 +33,11 @@ private[spark] class SparkDeploySchedulerBackend(
override def start() {
super.start()
val environment = new HashMap[String, String]
for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
if (System.getenv(key) != null) {
environment(key) = System.getenv(key)
}
}
val masterUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command("spark.executor.StandaloneExecutorBackend", args, environment)
val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command)
client = new Client(sc.env.actorSystem, master, jobDesc, this)
......
......@@ -33,15 +33,6 @@ private[spark] class CoarseMesosSchedulerBackend(
with MScheduler
with Logging {
// Environment variables to pass to our executors
val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
"SPARK_MEM",
"SPARK_CLASSPATH",
"SPARK_LIBRARY_PATH",
"SPARK_JAVA_OPTS",
"SPARK_TESTING"
)
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
// Memory used by each executor (in megabytes)
......@@ -123,13 +114,15 @@ private[spark] class CoarseMesosSchedulerBackend(
val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
if (System.getenv(key) != null) {
sc.executorEnvs.foreach { case(key, value) =>
if (value == null) {
logInfo("Environment variable not set: " + key)
} else {
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(System.getenv(key))
.setValue(value)
.build())
}
}
}
return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
}
......
......@@ -29,15 +29,6 @@ private[spark] class MesosSchedulerBackend(
with MScheduler
with Logging {
// Environment variables to pass to our executors
val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
"SPARK_MEM",
"SPARK_CLASSPATH",
"SPARK_LIBRARY_PATH",
"SPARK_JAVA_OPTS",
"SPARK_TESTING"
)
// Memory used by each executor (in megabytes)
val EXECUTOR_MEMORY = {
if (System.getenv("SPARK_MEM") != null) {
......@@ -94,13 +85,15 @@ private[spark] class MesosSchedulerBackend(
}
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val environment = Environment.newBuilder()
for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
if (System.getenv(key) != null) {
sc.executorEnvs.foreach { case(key, value) =>
if (value == null) {
logInfo("Environment variable not set: " + key)
} else {
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(System.getenv(key))
.setValue(value)
.build())
}
}
}
val memory = Resource.newBuilder()
.setName("mem")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment