From c0b4095ee8b5363d157094ff11bab2c47e77dbf4 Mon Sep 17 00:00:00 2001 From: "Y.CORP.YAHOO.COM\\tgraves" <tgraves@thatenemy-lm.champ.corp.yahoo.com> Date: Mon, 26 Aug 2013 12:48:37 -0500 Subject: [PATCH] Change to use Yarn appropriate directories rather then /tmp or the user specified spark.local.dir --- .../spark/deploy/yarn/ApplicationMaster.scala | 17 +++++++++++++++++ .../main/scala/spark/deploy/yarn/Client.scala | 4 ++++ .../spark/deploy/yarn/WorkerRunnable.scala | 4 ++++ 3 files changed, 25 insertions(+) diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala index 15dbd1c0fb..cda3c21c58 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -47,6 +47,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var isFinished:Boolean = false def run() { + // setup the directories so things go to yarn approved directories rather + // then user specified and /tmp + System.setProperty("spark.local.dir", getLocalDirs()) appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() @@ -89,6 +92,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e System.exit(0) } + + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + var localDirs = System.getenv("LOCAL_DIRS") + val yarnLocalSysDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + yarnLocalSysDirs match { + case Some(s) => localDirs = s + case None => if (localDirs == null) localDirs = "" + } + return localDirs + } private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() diff --git a/yarn/src/main/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala index 9d3860b863..aa70a3edb8 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala @@ -221,6 +221,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Add Xmx for am memory JAVA_OPTS += "-Xmx" + amMemory + "m " + JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + + // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same // node, spark gc effects all other containers performance (which can also be other spark containers) diff --git a/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala index f458f2f6a1..fe713b3629 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala @@ -75,6 +75,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } + + JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same // node, spark gc effects all other containers performance (which can also be other spark containers) -- GitLab