diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 036c7191adcffd1383c05c184105ff50b15f662f..fa82d2b3245dade349329cf8120c62ae1433fc11 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -60,6 +60,13 @@ private[spark] class Executor(
     System.setProperty(key, value)
   }
 
+  // If we are in yarn mode, systems can have different disk layouts so we must set it
+  // to what Yarn on this system said was available. This will be used later when SparkEnv
+  // created.
+  if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
+    System.setProperty("spark.local.dir", getYarnLocalDirs())
+  }
+
   // Create our ClassLoader and set it on this thread
   private val urlClassLoader = createClassLoader()
   private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
@@ -107,6 +114,21 @@ private[spark] class Executor(
     threadPool.execute(new TaskRunner(context, taskId, serializedTask))
   }
 
+  /** Get the Yarn approved local directories. */
+  private def getYarnLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+      .getOrElse(""))
+
+    if (localDirs.isEmpty()) {
+      throw new Exception("Yarn Local dirs can't be empty")
+    }
+    return localDirs
+  }
+
   class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
     extends Runnable {
 
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index cac9c5e4b6ff456feee26522a66aac3f2b4ec81e..1a0afd19d43597f932f4fefe2be6c7630f40aaa9 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -63,7 +63,6 @@ The command to launch the YARN Client is as follows:
       --master-memory <MEMORY_FOR_MASTER> \
       --worker-memory <MEMORY_PER_WORKER> \
       --worker-cores <CORES_PER_WORKER> \
-      --user <hadoop_user> \
       --queue <queue_name>
 
 For example:
@@ -83,5 +82,4 @@ The above starts a YARN Client programs which periodically polls the Application
 
 - When your application instantiates a Spark context it must use a special "yarn-standalone" master url. This starts the scheduler without forcing it to connect to a cluster. A good way to handle this is to pass "yarn-standalone" as an argument to your program, as shown in the example above.
 - We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed.
-- Currently, we have not yet integrated with hadoop security. If --user is present, the hadoop_user specified will be used to run the tasks on the cluster. If unspecified, current user will be used (which should be valid in cluster).
-  Once hadoop security support is added, and if hadoop cluster is enabled with security, additional restrictions would apply via delegation tokens passed.
+- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
index 15dbd1c0fb293cf89089f02c35dbc6ae65f11109..0f3b6bc1a65d0c77e6d84d330529705ff5fd1238 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -47,6 +47,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   private var isFinished:Boolean = false
 
   def run() {
+    // setup the directories so things go to yarn approved directories rather
+    // then user specified and /tmp
+    System.setProperty("spark.local.dir", getLocalDirs())
     
     appAttemptId = getApplicationAttemptId()
     resourceManager = registerWithResourceManager()
@@ -89,6 +92,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
     System.exit(0)
   }
+
+  /** Get the Yarn approved local directories. */
+  private def getLocalDirs(): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+      .getOrElse(""))
+
+    if (localDirs.isEmpty()) {
+      throw new Exception("Yarn Local dirs can't be empty")
+    }
+    return localDirs
+  }
   
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
diff --git a/yarn/src/main/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala
index e84fb7c9858a40a6f335c1b3bca1f2aa6bb9ee25..eb2a8cc6420765c88d9c87957f55f8540080f59f 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala
@@ -223,6 +223,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     // Add Xmx for am memory
     JAVA_OPTS += "-Xmx" + amMemory + "m "
 
+    JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
+                                                 YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+
+
     // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
     // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
     // node, spark gc effects all other containers performance (which can also be other spark containers)
diff --git a/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala
index 3e007509e6e12dfaaea646be4ae84e8c1dbe56cc..0e1fd9b680e3987a5f46280af4a7697de25648b1 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala
@@ -75,6 +75,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
     if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
       JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
     }
+
+    JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
+                                                 YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+
     // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
     // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
     // node, spark gc effects all other containers performance (which can also be other spark containers)