diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 72540c712a26b7accce5eb27c12e943c0996e164..8ed5dcbf5da9539bdb59d6ab18cbd9d99cdc02ea 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -643,10 +643,21 @@ class SparkContext( key = uri.getScheme match { case null | "file" => if (env.hadoop.isYarnMode()) { - logWarning("local jar specified as parameter to addJar under Yarn mode") - return + // In order for this to work on yarn the user must specify the --addjars option to + // the client to upload the file into the distributed cache to make it show up in the + // current working directory. + val fileName = new Path(uri.getPath).getName() + try { + env.httpFileServer.addJar(new File(fileName)) + } catch { + case e: Exception => { + logError("Error adding jar (" + e + "), was the --addJars option used?") + throw e + } + } + } else { + env.httpFileServer.addJar(new File(uri.getPath)) } - env.httpFileServer.addJar(new File(uri.getPath)) case _ => path } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index beaae69aa269e30d98fc1b330acbea628878c2e6..a807ec603dafacbb05463e67b2d507bbc69fab8f 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -53,6 +53,7 @@ The command to launch the YARN Client is as follows: --worker-memory <MEMORY_PER_WORKER> \ --worker-cores <CORES_PER_WORKER> \ --queue <queue_name> \ + --addJars <any_local_files_used_in_SparkContext.addJar> \ --files <files_for_distributed_cache> \ --archives <archives_for_distributed_cache> @@ -88,3 +89,4 @@ The above starts a YARN Client programs which periodically polls the Application - We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed. - The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. - The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN. +- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b4d243ed7afb33fc3000665821d6b52b0e5d8368..fb1b339f2707737bb7a4a02d76ed950f65715187 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -127,7 +127,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl originalPath: Path, replication: Short, localResources: HashMap[String,LocalResource], - fragment: String) = { + fragment: String, + appMasterOnly: Boolean = false): Unit = { val fs = FileSystem.get(conf) val newPath = new Path(dstDir, originalPath.getName()) logInfo("Uploading " + originalPath + " to " + newPath) @@ -149,6 +150,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl pathURI = new URI(newPath.toString() + "#" + fragment); } val distPath = pathURI.toString() + if (appMasterOnly == true) return if (resourceType == LocalResourceType.FILE) { distFiles match { case Some(path) => @@ -223,6 +225,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } + // handle any add jars + if ((args.addJars != null) && (!args.addJars.isEmpty())){ + args.addJars.split(',').foreach { case file: String => + val tmpURI = new URI(file) + val tmp = new Path(tmpURI) + copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources, + tmpURI.getFragment(), true) + } + } + // handle any distributed cache files if ((args.files != null) && (!args.files.isEmpty())){ args.files.split(',').foreach { case file: String => @@ -253,11 +265,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() - // If log4j present, ensure ours overrides all others - if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + Apps.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") Client.populateHadoopClasspath(yarnConf, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = @@ -279,6 +290,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString() } + // set the environment variables to be passed on to the Workers if (distFiles != None) { env("SPARK_YARN_CACHE_FILES") = distFiles.get env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get @@ -328,8 +340,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Add Xmx for am memory JAVA_OPTS += "-Xmx" + amMemory + "m " - JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + JAVA_OPTS += " -Djava.io.tmpdir=" + + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. @@ -345,6 +357,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 " JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 " } + if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 30d9b6e60f8cd5745e192d0ea6c4165817940e86..0833153541fed2de1ce141fd0edd3f2646ce2308 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -24,6 +24,7 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware ! class ClientArguments(val args: Array[String]) { + var addJars: String = null var files: String = null var archives: String = null var userJar: String = null @@ -80,6 +81,10 @@ class ClientArguments(val args: Array[String]) { amQueue = value args = tail + case ("--addJars") :: value :: tail => + addJars = value + args = tail + case ("--files") :: value :: tail => files = value args = tail @@ -119,8 +124,9 @@ class ClientArguments(val args: Array[String]) { " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + - " --files file Comma separated list of files to be distributed with the job.\n" + - " --archives archive Comma separated list of archives to be distributed with the job." + " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + + " --files files Comma separated list of files to be distributed with the job.\n" + + " --archives archives Comma separated list of archives to be distributed with the job." ) System.exit(exitCode) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index d340b114df8e70b3a20b81768fecca64981dfe11..8dac9e02ac0b712fcf712c60575b43b3d2729f9c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -77,8 +77,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } - JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + JAVA_OPTS += " -Djava.io.tmpdir=" + + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " + // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same @@ -215,15 +216,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - // If log4j present, ensure ours overrides all others - if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { - // Which is correct ? - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") - } - - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + Apps.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*") Client.populateHadoopClasspath(yarnConf, env) // allow users to specify some environment variables