From b6571541a6043ed12cb8af51e198e207968394a7 Mon Sep 17 00:00:00 2001
From: tgravescs <tgraves@thatenemy-lm.champ.corp.yahoo.com>
Date: Mon, 21 Oct 2013 14:05:15 -0500
Subject: [PATCH] Fix the Worker to use CoarseGrainedExecutorBackend and modify
 classpath to be explicit about inclusion of spark.jar and app.jar

---
 .../org/apache/spark/deploy/yarn/Client.scala | 32 ++++++++++++++++---
 .../spark/deploy/yarn/WorkerRunnable.scala    |  7 ++--
 2 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8afb3e39cb..1a380ae714 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -265,11 +265,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     val env = new HashMap[String, String]()
 
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name,
-      Environment.PWD.$() + Path.SEPARATOR + "*")
-
-    Client.populateHadoopClasspath(yarnConf, env)
+    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_JAR_PATH") = 
       localResources("spark.jar").getResource().getScheme.toString() + "://" +
@@ -451,4 +447,30 @@ object Client {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
     }
   }
+
+  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+    // If log4j present, ensure ours overrides all others
+    if (addLog4j) {
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+        Path.SEPARATOR + "log4j.properties")
+    }
+    // normally the users app.jar is last in case conflicts with spark jars
+    val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
+      .toBoolean
+    if (userClasspathFirst) {
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+        Path.SEPARATOR + "app.jar")
+    }
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+      Path.SEPARATOR + "spark.jar")
+    Client.populateHadoopClasspath(conf, env)
+
+    if (!userClasspathFirst) {
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+        Path.SEPARATOR + "app.jar")
+    }
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+      Path.SEPARATOR + "*")
+  }
 }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 8dac9e02ac..ba352daac4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -121,7 +121,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
       // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
       " -XX:OnOutOfMemoryError='kill %p' " +
       JAVA_OPTS +
-      " org.apache.spark.executor.StandaloneExecutorBackend " +
+      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
       masterAddress + " " +
       slaveId + " " +
       hostname + " " +
@@ -216,10 +216,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
 
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, 
-      Environment.PWD.$() + Path.SEPARATOR + "*")
-    Client.populateHadoopClasspath(yarnConf, env)
+    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
 
     // allow users to specify some environment variables
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
-- 
GitLab