diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 4243ef480ba39ce083d3c17a5c4e05f972aa75dc..fecd8f2cc2d489f48bf74fb8b45710de09907a56 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -68,15 +68,29 @@ Most of the configs are the same for Spark on YARN as for other deployment modes </td> </tr> <tr> - <td><code>spark.yarn.executor.memoryOverhead</code></td> - <td>384</code></td> + <td><code>spark.yarn.dist.archives</code></td> + <td>(none)</td> + <td> + Comma separated list of archives to be extracted into the working directory of each executor. + </td> +</tr> +<tr> + <td><code>spark.yarn.dist.files</code></td> + <td>(none)</td> + <td> + Comma-separated list of files to be placed in the working directory of each executor. + <td> +</tr> +<tr> + <td><code>spark.yarn.executor.memoryOverhead</code></td> + <td>384</td> <td> The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. </td> </tr> <tr> <td><code>spark.yarn.driver.memoryOverhead</code></td> - <td>384</code></td> + <td>384</td> <td> The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. </td> diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index fd3ef9e1fa2de318d320ccb3585c238fd3a24475..62f9b3cf5ab885a970680b6acc7d866a4658731e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -21,8 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.spark.SparkConf import org.apache.spark.scheduler.InputFormatInfo -import org.apache.spark.util.IntParam -import org.apache.spark.util.MemoryParam +import org.apache.spark.util.{Utils, IntParam, MemoryParam} // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! @@ -45,6 +44,18 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { parseArgs(args.toList) + // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then + // it should default to hdfs:// + files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull) + archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull) + + // spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified, + // for both yarn-client and yarn-cluster + files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files"). + map(p => Utils.resolveURIs(p)).orNull) + archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives"). + map(p => Utils.resolveURIs(p)).orNull) + private def parseArgs(inputArgs: List[String]): Unit = { val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]() val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]() diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 858bcaa95b40967d5c7f104c5889971cb018069e..8f2267599914ccec6bf6255a9529237e88de5fbe 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -162,7 +162,7 @@ trait ClientBase extends Logging { val fs = FileSystem.get(conf) val remoteFs = originalPath.getFileSystem(conf) var newPath = originalPath - if (! compareFs(remoteFs, fs)) { + if (!compareFs(remoteFs, fs)) { newPath = new Path(dstDir, originalPath.getName()) logInfo("Uploading " + originalPath + " to " + newPath) FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf) @@ -250,6 +250,7 @@ trait ClientBase extends Logging { } } } + logInfo("Prepared Local resources " + localResources) sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) UserGroupInformation.getCurrentUser().addCredentials(credentials) diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 039cf4f2761190bd85cf3a66c23ef402267aea46..412dfe38d55ebc57e4e62bbf1e88d34e9e985d63 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -70,9 +70,7 @@ private[spark] class YarnClientSchedulerBackend( ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), - ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"), - ("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"), - ("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives")) + ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")) .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } logDebug("ClientArguments called with: " + argsArrayBuf)