diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8afb3e39cb8a6a0e7df4c4e41e03083867601b71..1a380ae714534bc06146d62c0921a404bc64ce5e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -265,11 +265,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     val env = new HashMap[String, String]()
 
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name,
-      Environment.PWD.$() + Path.SEPARATOR + "*")
-
-    Client.populateHadoopClasspath(yarnConf, env)
+    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_JAR_PATH") = 
       localResources("spark.jar").getResource().getScheme.toString() + "://" +
@@ -451,4 +447,30 @@ object Client {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
     }
   }
+
+  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+    // If log4j present, ensure ours overrides all others
+    if (addLog4j) {
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+        Path.SEPARATOR + "log4j.properties")
+    }
+    // normally the users app.jar is last in case conflicts with spark jars
+    val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
+      .toBoolean
+    if (userClasspathFirst) {
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+        Path.SEPARATOR + "app.jar")
+    }
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+      Path.SEPARATOR + "spark.jar")
+    Client.populateHadoopClasspath(conf, env)
+
+    if (!userClasspathFirst) {
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+        Path.SEPARATOR + "app.jar")
+    }
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+      Path.SEPARATOR + "*")
+  }
 }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 8dac9e02ac0b712fcf712c60575b43b3d2729f9c..ba352daac485d0381a2df87f391b8435a315bc61 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -121,7 +121,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
       // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
       " -XX:OnOutOfMemoryError='kill %p' " +
       JAVA_OPTS +
-      " org.apache.spark.executor.StandaloneExecutorBackend " +
+      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
       masterAddress + " " +
       slaveId + " " +
       hostname + " " +
@@ -216,10 +216,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
 
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, 
-      Environment.PWD.$() + Path.SEPARATOR + "*")
-    Client.populateHadoopClasspath(yarnConf, env)
+    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
 
     // allow users to specify some environment variables
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))