Skip to content
Snippets Groups Projects
Commit b6571541 authored by tgravescs's avatar tgravescs
Browse files

Fix the Worker to use CoarseGrainedExecutorBackend and modify classpath to be explicit

about inclusion of spark.jar and app.jar
parent 35886f34
No related branches found
No related tags found
No related merge requests found
......@@ -265,11 +265,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val env = new HashMap[String, String]()
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
Apps.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + "*")
Client.populateHadoopClasspath(yarnConf, env)
Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
localResources("spark.jar").getResource().getScheme.toString() + "://" +
......@@ -451,4 +447,30 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
}
}
def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
// If log4j present, ensure ours overrides all others
if (addLog4j) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "log4j.properties")
}
// normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "app.jar")
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "spark.jar")
Client.populateHadoopClasspath(conf, env)
if (!userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "app.jar")
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")
}
}
......@@ -121,7 +121,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
" -XX:OnOutOfMemoryError='kill %p' " +
JAVA_OPTS +
" org.apache.spark.executor.StandaloneExecutorBackend " +
" org.apache.spark.executor.CoarseGrainedExecutorBackend " +
masterAddress + " " +
slaveId + " " +
hostname + " " +
......@@ -216,10 +216,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
Apps.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + "*")
Client.populateHadoopClasspath(yarnConf, env)
Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
// allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment