From a35472e1dd2ea1b5a0b1fb6b382f5a98f5aeba5a Mon Sep 17 00:00:00 2001
From: tgravescs <tgraves_cs@yahoo.com>
Date: Mon, 4 Nov 2013 09:40:40 -0600
Subject: [PATCH] Allow spark on yarn to be run from HDFS. Allows the
 spark.jar, app.jar, and log4j.properties to be put into hdfs.

---
 docs/running-on-yarn.md                       |   1 +
 pom.xml                                       |   6 +
 yarn/pom.xml                                  |  50 ++++
 .../spark/deploy/yarn/ApplicationMaster.scala |   2 +-
 .../org/apache/spark/deploy/yarn/Client.scala | 276 +++++++++---------
 .../yarn/ClientDistributedCacheManager.scala  | 228 +++++++++++++++
 .../spark/deploy/yarn/WorkerRunnable.scala    |  42 +--
 .../ClientDistributedCacheManagerSuite.scala  | 220 ++++++++++++++
 8 files changed, 653 insertions(+), 172 deletions(-)
 create mode 100644 yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
 create mode 100644 yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala

diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 2898af0bed..6fd1d0d150 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -21,6 +21,7 @@ The assembled JAR will be something like this:
 # Preparations
 
 - Building a YARN-enabled assembly (see above).
+- The assembled jar can be installed into HDFS or used locally.
 - Your application code must be packaged into a separate JAR file.
 
 If you want to test out the YARN deployment mode, you can use the current Spark examples. A `spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}` file can be generated by running `sbt/sbt assembly`. NOTE: since the documentation you're reading is for Spark version {{site.SPARK_VERSION}}, we are assuming here that you have downloaded Spark {{site.SPARK_VERSION}} or checked it out of source control. If you are using a different version of Spark, the version numbers in the jar generated by the sbt package command will obviously be different.
diff --git a/pom.xml b/pom.xml
index 53ac82efd0..edcc3b35cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -385,6 +385,12 @@
         <version>3.1</version>
         <scope>test</scope>
       </dependency>
+     <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-all</artifactId>
+        <version>1.8.5</version>
+        <scope>test</scope>
+      </dependency>
       <dependency>
         <groupId>org.scalacheck</groupId>
         <artifactId>scalacheck_2.9.3</artifactId>
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 3bc619df07..8a065c6d7d 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -61,6 +61,16 @@
       <groupId>org.apache.avro</groupId>
       <artifactId>avro-ipc</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.9.3</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -106,6 +116,46 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>test</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <exportAntProperties>true</exportAntProperties>
+              <tasks>
+                <property name="spark.classpath" refid="maven.test.classpath" />
+                <property environment="env" />
+                <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">  
+                  <condition>
+                    <not>
+                      <or>
+                        <isset property="env.SCALA_HOME" />
+                        <isset property="env.SCALA_LIBRARY_PATH" />
+                      </or>
+                    </not>
+                  </condition>
+                </fail>
+              </tasks>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <configuration>
+          <environmentVariables>
+            <SPARK_HOME>${basedir}/..</SPARK_HOME>
+            <SPARK_TESTING>1</SPARK_TESTING>
+            <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
+          </environmentVariables>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index c1a87d3373..4302ef4cda 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -349,7 +349,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     try {
       val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean
       if (!preserveFiles) {
-        stagingDirPath = new Path(System.getenv("SPARK_YARN_JAR_PATH")).getParent()
+        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
         if (stagingDirPath == null) {
           logError("Staging directory is null")
           return
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1a380ae714..4e0e060ddc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,26 +17,31 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.net.{InetSocketAddress, URI}
+import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
 import java.nio.ByteBuffer
+
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.client.YarnClientImpl
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{Apps, Records}
+
 import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
 import scala.collection.JavaConversions._
+
 import org.apache.spark.Logging 
 import org.apache.spark.util.Utils
-import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.spark.deploy.SparkHadoopUtil
 
 class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
@@ -46,13 +51,14 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   var rpc: YarnRPC = YarnRPC.create(conf)
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-  private var distFiles = None: Option[String]
-  private var distFilesTimeStamps = None: Option[String]
-  private var distFilesFileSizes = None: Option[String]
-  private var distArchives = None: Option[String]
-  private var distArchivesTimeStamps = None: Option[String]
-  private var distArchivesFileSizes = None: Option[String]
-  
+  private val SPARK_STAGING: String = ".sparkStaging"
+  private val distCacheMgr = new ClientDistributedCacheManager()
+
+  // staging directory is private! -> rwx--------
+  val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
+  // app files are world-wide readable and owner writable -> rw-r--r--
+  val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) 
+
   def run() {
     init(yarnConf)
     start()
@@ -63,8 +69,9 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     verifyClusterResources(newApp)
     val appContext = createApplicationSubmissionContext(appId)
-    val localResources = prepareLocalResources(appId, ".sparkStaging")
-    val env = setupLaunchEnv(localResources)
+    val appStagingDir = getAppStagingDir(appId)
+    val localResources = prepareLocalResources(appStagingDir)
+    val env = setupLaunchEnv(localResources, appStagingDir)
     val amContainer = createContainerLaunchContext(newApp, localResources, env)
 
     appContext.setQueue(args.amQueue)
@@ -76,7 +83,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     monitorApplication(appId)
     System.exit(0)
   }
-  
+
+  def getAppStagingDir(appId: ApplicationId): String = {
+    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+  }
 
   def logClusterResourceDetails() {
     val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
@@ -116,73 +126,73 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     return appContext
   }
 
+  /*
+   * see if two file systems are the same or not.
+   */
+  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+    val srcUri = srcFs.getUri()
+    val dstUri = destFs.getUri()
+    if (srcUri.getScheme() == null) {
+      return false
+    }
+    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+      return false
+    }
+    var srcHost = srcUri.getHost()
+    var dstHost = dstUri.getHost()
+    if ((srcHost != null) && (dstHost != null)) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+      } catch {
+        case e: UnknownHostException =>
+          return false
+      }
+      if (!srcHost.equals(dstHost)) {
+        return false
+      }
+    } else if (srcHost == null && dstHost != null) {
+      return false
+    } else if (srcHost != null && dstHost == null) {
+      return false
+    }
+    //check for ports
+    if (srcUri.getPort() != dstUri.getPort()) {
+      return false
+    }
+    return true;
+  }
+
   /**
-   * Copy the local file into HDFS and configure to be distributed with the
-   * job via the distributed cache.
-   * If a fragment is specified the file will be referenced as that fragment.
+   * Copy the file into HDFS if needed.
    */
-  private def copyLocalFile(
+  private def copyRemoteFile(
       dstDir: Path,
-      resourceType: LocalResourceType,
       originalPath: Path,
       replication: Short,
-      localResources: HashMap[String,LocalResource],
-      fragment: String,
-      appMasterOnly: Boolean = false): Unit = {
+      setPerms: Boolean = false): Path = {
     val fs = FileSystem.get(conf)
-    val newPath = new Path(dstDir, originalPath.getName())
-    logInfo("Uploading " + originalPath + " to " + newPath)
-    fs.copyFromLocalFile(false, true, originalPath, newPath)
-    fs.setReplication(newPath, replication);
-    val destStatus = fs.getFileStatus(newPath)
-
-    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    amJarRsrc.setType(resourceType)
-    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath))
-    amJarRsrc.setTimestamp(destStatus.getModificationTime())
-    amJarRsrc.setSize(destStatus.getLen())
-    var pathURI: URI = new URI(newPath.toString() + "#" + originalPath.getName());
-    if ((fragment == null) || (fragment.isEmpty())){
-      localResources(originalPath.getName()) = amJarRsrc
-    } else {
-      localResources(fragment) = amJarRsrc
-      pathURI = new URI(newPath.toString() + "#" + fragment);
-    }
-    val distPath = pathURI.toString()
-    if (appMasterOnly == true) return
-    if (resourceType == LocalResourceType.FILE) {
-      distFiles match {
-        case Some(path) =>
-          distFilesFileSizes = Some(distFilesFileSizes.get + "," + 
-            destStatus.getLen().toString())
-          distFilesTimeStamps = Some(distFilesTimeStamps.get + "," + 
-            destStatus.getModificationTime().toString())
-          distFiles = Some(path + "," + distPath)
-        case _ => 
-          distFilesFileSizes = Some(destStatus.getLen().toString())
-          distFilesTimeStamps = Some(destStatus.getModificationTime().toString())
-          distFiles = Some(distPath)
-      }
-    } else {
-      distArchives match {
-        case Some(path) =>
-          distArchivesTimeStamps = Some(distArchivesTimeStamps.get + "," +
-            destStatus.getModificationTime().toString())
-          distArchivesFileSizes = Some(distArchivesFileSizes.get + "," + 
-            destStatus.getLen().toString())
-          distArchives = Some(path + "," + distPath)
-        case _ => 
-          distArchivesTimeStamps = Some(destStatus.getModificationTime().toString())
-          distArchivesFileSizes = Some(destStatus.getLen().toString())
-          distArchives = Some(distPath)
-      }
-    }
+    val remoteFs = originalPath.getFileSystem(conf);
+    var newPath = originalPath
+    if (! compareFs(remoteFs, fs)) {
+      newPath = new Path(dstDir, originalPath.getName())
+      logInfo("Uploading " + originalPath + " to " + newPath)
+      FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
+      fs.setReplication(newPath, replication);
+      if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
+    } 
+    // resolve any symlinks in the URI path so using a "current" symlink
+    // to point to a specific version shows the specific version
+    // in the distributed cache configuration
+    val qualPath = fs.makeQualified(newPath)
+    val fc = FileContext.getFileContext(qualPath.toUri(), conf)
+    val destPath = fc.resolvePath(qualPath)
+    destPath
   }
 
-  def prepareLocalResources(appId: ApplicationId, sparkStagingDir: String): HashMap[String, LocalResource] = {
+  def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
     logInfo("Preparing Local resources")
-    // Upload Spark and the application JAR to the remote file system
+    // Upload Spark and the application JAR to the remote file system if necessary
     // Add them as local resources to the AM
     val fs = FileSystem.get(conf)
 
@@ -193,9 +203,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         System.exit(1)
       }
     }
-
-    val pathSuffix = sparkStagingDir + "/" + appId.toString() + "/"
-    val dst = new Path(fs.getHomeDirectory(), pathSuffix)
+    val dst = new Path(fs.getHomeDirectory(), appStagingDir)
     val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort
 
     if (UserGroupInformation.isSecurityEnabled()) {
@@ -203,55 +211,65 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       dstFs.addDelegationTokens(delegTokenRenewer, credentials);
     }
     val localResources = HashMap[String, LocalResource]()
+    FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
+
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+
+    if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
+      logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
+      System.exit(1)
+    }
 
-    Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF"))
+    Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, 
+      Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
     .foreach { case(destName, _localPath) =>
       val localPath: String = if (_localPath != null) _localPath.trim() else ""
       if (! localPath.isEmpty()) {
-        val src = new Path(localPath)
-        val newPath = new Path(dst, destName)
-        logInfo("Uploading " + src + " to " + newPath)
-        fs.copyFromLocalFile(false, true, src, newPath)
-        fs.setReplication(newPath, replication);
-        val destStatus = fs.getFileStatus(newPath)
-
-        val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-        amJarRsrc.setType(LocalResourceType.FILE)
-        amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
-        amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath))
-        amJarRsrc.setTimestamp(destStatus.getModificationTime())
-        amJarRsrc.setSize(destStatus.getLen())
-        localResources(destName) = amJarRsrc
+        var localURI = new URI(localPath)
+        // if not specified assume these are in the local filesystem to keep behavior like Hadoop
+        if (localURI.getScheme() == null) {
+          localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString())
+        }
+        val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
+        val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+          destName, statCache)
       }
     }
 
     // handle any add jars
     if ((args.addJars != null) && (!args.addJars.isEmpty())){
       args.addJars.split(',').foreach { case file: String =>
-        val tmpURI = new URI(file)
-        val tmp = new Path(tmpURI)
-        copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
-          tmpURI.getFragment(), true)
+        val localURI = new URI(file.trim())
+        val localPath = new Path(localURI)
+        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+        val destPath = copyRemoteFile(dst, localPath, replication)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+          linkname, statCache, true)
       }
     }
 
     // handle any distributed cache files
     if ((args.files != null) && (!args.files.isEmpty())){
       args.files.split(',').foreach { case file: String =>
-        val tmpURI = new URI(file)
-        val tmp = new Path(tmpURI)
-        copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources,
-          tmpURI.getFragment())
+        val localURI = new URI(file.trim())
+        val localPath = new Path(localURI)
+        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+        val destPath = copyRemoteFile(dst, localPath, replication)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+          linkname, statCache)
       }
     }
 
     // handle any distributed cache archives
     if ((args.archives != null) && (!args.archives.isEmpty())) {
       args.archives.split(',').foreach { case file:String =>
-        val tmpURI = new URI(file)
-        val tmp = new Path(tmpURI)
-        copyLocalFile(dst, LocalResourceType.ARCHIVE, tmp, replication, 
-          localResources, tmpURI.getFragment())
+        val localURI = new URI(file.trim())
+        val localPath = new Path(localURI)
+        val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+        val destPath = copyRemoteFile(dst, localPath, replication)
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, 
+          linkname, statCache)
       }
     }
 
@@ -259,44 +277,21 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     return localResources
   }
   
-  def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = {
+  def setupLaunchEnv(
+      localResources: HashMap[String, LocalResource], 
+      stagingDir: String): HashMap[String, String] = {
     logInfo("Setting up the launch environment")
-    val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null)
+    val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
 
     val env = new HashMap[String, String]()
 
     Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
     env("SPARK_YARN_MODE") = "true"
-    env("SPARK_YARN_JAR_PATH") = 
-      localResources("spark.jar").getResource().getScheme.toString() + "://" +
-      localResources("spark.jar").getResource().getFile().toString()
-    env("SPARK_YARN_JAR_TIMESTAMP") =  localResources("spark.jar").getTimestamp().toString()
-    env("SPARK_YARN_JAR_SIZE") =  localResources("spark.jar").getSize().toString()
-
-    env("SPARK_YARN_USERJAR_PATH") =
-      localResources("app.jar").getResource().getScheme.toString() + "://" +
-      localResources("app.jar").getResource().getFile().toString()
-    env("SPARK_YARN_USERJAR_TIMESTAMP") =  localResources("app.jar").getTimestamp().toString()
-    env("SPARK_YARN_USERJAR_SIZE") =  localResources("app.jar").getSize().toString()
-
-    if (log4jConfLocalRes != null) {
-      env("SPARK_YARN_LOG4J_PATH") =
-        log4jConfLocalRes.getResource().getScheme.toString() + "://" + log4jConfLocalRes.getResource().getFile().toString()
-      env("SPARK_YARN_LOG4J_TIMESTAMP") =  log4jConfLocalRes.getTimestamp().toString()
-      env("SPARK_YARN_LOG4J_SIZE") =  log4jConfLocalRes.getSize().toString()
-    }
+    env("SPARK_YARN_STAGING_DIR") = stagingDir
 
     // set the environment variables to be passed on to the Workers
-    if (distFiles != None) {
-      env("SPARK_YARN_CACHE_FILES") = distFiles.get
-      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get
-      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = distFilesFileSizes.get
-    }
-    if (distArchives != None) {
-      env("SPARK_YARN_CACHE_ARCHIVES") = distArchives.get
-      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = distArchivesTimeStamps.get
-      env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = distArchivesFileSizes.get
-    }
+    distCacheMgr.setDistFilesEnv(env)
+    distCacheMgr.setDistArchivesEnv(env)
 
     // allow users to specify some environment variables
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
@@ -365,6 +360,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
     }
 
+    if (args.userClass == null) {
+      logError("Error: You must specify a user class!")
+      System.exit(1)
+    }
+
     val commands = List[String](javaCommand + 
       " -server " +
       JAVA_OPTS +
@@ -432,6 +432,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 }
 
 object Client {
+  val SPARK_JAR: String = "spark.jar"
+  val APP_JAR: String = "app.jar"
+  val LOG4J_PROP: String = "log4j.properties"
+
   def main(argStrings: Array[String]) {
     // Set an env variable indicating we are running in YARN mode.
     // Note that anything with SPARK prefix gets propagated to all (remote) processes
@@ -453,22 +457,22 @@ object Client {
     // If log4j present, ensure ours overrides all others
     if (addLog4j) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
-        Path.SEPARATOR + "log4j.properties")
+        Path.SEPARATOR + LOG4J_PROP)
     }
     // normally the users app.jar is last in case conflicts with spark jars
     val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
       .toBoolean
     if (userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
-        Path.SEPARATOR + "app.jar")
+        Path.SEPARATOR + APP_JAR)
     }
     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
-      Path.SEPARATOR + "spark.jar")
+      Path.SEPARATOR + SPARK_JAR)
     Client.populateHadoopClasspath(conf, env)
 
     if (!userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
-        Path.SEPARATOR + "app.jar")
+        Path.SEPARATOR + APP_JAR)
     }
     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
       Path.SEPARATOR + "*")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
new file mode 100644
index 0000000000..07686fefd7
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
+
+import org.apache.spark.Logging 
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.LinkedHashMap
+import scala.collection.mutable.Map
+
+
+/** Client side methods to setup the Hadoop distributed cache */
+class ClientDistributedCacheManager() extends Logging {
+  private val distCacheFiles: Map[String, Tuple3[String, String, String]] = 
+    LinkedHashMap[String, Tuple3[String, String, String]]()
+  private val distCacheArchives: Map[String, Tuple3[String, String, String]] = 
+    LinkedHashMap[String, Tuple3[String, String, String]]()
+
+
+  /**
+   * Add a resource to the list of distributed cache resources. This list can
+   * be sent to the ApplicationMaster and possibly the workers so that it can 
+   * be downloaded into the Hadoop distributed cache for use by this application.
+   * Adds the LocalResource to the localResources HashMap passed in and saves 
+   * the stats of the resources to they can be sent to the workers and verified.
+   *
+   * @param fs FileSystem
+   * @param conf Configuration
+   * @param destPath path to the resource
+   * @param localResources localResource hashMap to insert the resource into
+   * @param resourceType LocalResourceType 
+   * @param link link presented in the distributed cache to the destination
+   * @param statCache cache to store the file/directory stats 
+   * @param appMasterOnly Whether to only add the resource to the app master
+   */
+  def addResource(
+      fs: FileSystem,
+      conf: Configuration,
+      destPath: Path, 
+      localResources: HashMap[String, LocalResource],
+      resourceType: LocalResourceType,
+      link: String,
+      statCache: Map[URI, FileStatus],
+      appMasterOnly: Boolean = false) = {
+    val destStatus = fs.getFileStatus(destPath)
+    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    amJarRsrc.setType(resourceType)
+    val visibility = getVisibility(conf, destPath.toUri(), statCache)
+    amJarRsrc.setVisibility(visibility)
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
+    amJarRsrc.setTimestamp(destStatus.getModificationTime())
+    amJarRsrc.setSize(destStatus.getLen())
+    if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
+    localResources(link) = amJarRsrc
+    
+    if (appMasterOnly == false) {
+      val uri = destPath.toUri()
+      val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
+      if (resourceType == LocalResourceType.FILE) {
+        distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), 
+          destStatus.getModificationTime().toString(), visibility.name())
+      } else {
+        distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(), 
+          destStatus.getModificationTime().toString(), visibility.name())
+      }
+    }
+  }
+
+  /**
+   * Adds the necessary cache file env variables to the env passed in
+   * @param env
+   */
+  def setDistFilesEnv(env: Map[String, String]) = {
+    val (keys, tupleValues) = distCacheFiles.unzip
+    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
+
+    if (keys.size > 0) {
+      env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = 
+        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = 
+        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = 
+        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
+    }
+  }
+
+  /**
+   * Adds the necessary cache archive env variables to the env passed in
+   * @param env
+   */
+  def setDistArchivesEnv(env: Map[String, String]) = {
+    val (keys, tupleValues) = distCacheArchives.unzip
+    val (sizes, timeStamps, visibilities) = tupleValues.unzip3
+
+    if (keys.size > 0) {
+      env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = 
+        timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
+        sizes.reduceLeft[String] { (acc,n) => acc + "," + n }
+      env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = 
+        visibilities.reduceLeft[String] { (acc,n) => acc + "," + n }
+    }
+  }
+
+  /**
+   * Returns the local resource visibility depending on the cache file permissions
+   * @param conf
+   * @param uri
+   * @param statCache
+   * @return LocalResourceVisibility
+   */
+  def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
+      LocalResourceVisibility = {
+    if (isPublic(conf, uri, statCache)) {
+      return LocalResourceVisibility.PUBLIC 
+    } 
+    return LocalResourceVisibility.PRIVATE
+  }
+
+  /**
+   * Returns a boolean to denote whether a cache file is visible to all(public)
+   * or not
+   * @param conf
+   * @param uri
+   * @param statCache
+   * @return true if the path in the uri is visible to all, false otherwise
+   */
+  def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
+    val fs = FileSystem.get(uri, conf)
+    val current = new Path(uri.getPath())
+    //the leaf level file should be readable by others
+    if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
+      return false
+    }
+    return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
+  }
+
+  /**
+   * Returns true if all ancestors of the specified path have the 'execute'
+   * permission set for all users (i.e. that other users can traverse
+   * the directory heirarchy to the given path)
+   * @param fs
+   * @param path
+   * @param statCache
+   * @return true if all ancestors have the 'execute' permission set for all users
+   */
+  def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path, 
+      statCache: Map[URI, FileStatus]): Boolean =  {
+    var current = path
+    while (current != null) {
+      //the subdirs in the path should have execute permissions for others
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
+        return false
+      }
+      current = current.getParent()
+    }
+    return true
+  }
+
+  /**
+   * Checks for a given path whether the Other permissions on it 
+   * imply the permission in the passed FsAction
+   * @param fs
+   * @param path
+   * @param action
+   * @param statCache
+   * @return true if the path in the uri is visible to all, false otherwise
+   */
+  def checkPermissionOfOther(fs: FileSystem, path: Path,
+      action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
+    val status = getFileStatus(fs, path.toUri(), statCache);
+    val perms = status.getPermission()
+    val otherAction = perms.getOtherAction()
+    if (otherAction.implies(action)) {
+      return true;
+    }
+    return false
+  }
+
+  /**
+   * Checks to see if the given uri exists in the cache, if it does it 
+   * returns the existing FileStatus, otherwise it stats the uri, stores
+   * it in the cache, and returns the FileStatus.
+   * @param fs
+   * @param uri
+   * @param statCache
+   * @return FileStatus
+   */
+  def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
+    val stat = statCache.get(uri) match {
+      case Some(existstat) => existstat
+      case None => 
+        val newStat = fs.getFileStatus(new Path(uri))
+        statCache.put(uri, newStat)
+        newStat
+    }
+    return stat
+  }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index ba352daac4..7a66532254 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -142,11 +142,12 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
       rtype: LocalResourceType,
       localResources: HashMap[String, LocalResource],
       timestamp: String,
-      size: String) = {
+      size: String, 
+      vis: String) = {
     val uri = new URI(file)
     val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
     amJarRsrc.setType(rtype)
-    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
+    amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
     amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
     amJarRsrc.setTimestamp(timestamp.toLong)
     amJarRsrc.setSize(size.toLong)
@@ -158,44 +159,14 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
     logInfo("Preparing Local resources")
     val localResources = HashMap[String, LocalResource]()
     
-    // Spark JAR
-    val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    sparkJarResource.setType(LocalResourceType.FILE)
-    sparkJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
-    sparkJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
-      new URI(System.getenv("SPARK_YARN_JAR_PATH"))))
-    sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong)
-    sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong)
-    localResources("spark.jar") = sparkJarResource
-    // User JAR
-    val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-    userJarResource.setType(LocalResourceType.FILE)
-    userJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
-    userJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
-      new URI(System.getenv("SPARK_YARN_USERJAR_PATH"))))
-    userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong)
-    userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong)
-    localResources("app.jar") = userJarResource
-
-    // Log4j conf - if available
-    if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
-      val log4jConfResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
-      log4jConfResource.setType(LocalResourceType.FILE)
-      log4jConfResource.setVisibility(LocalResourceVisibility.APPLICATION)
-      log4jConfResource.setResource(ConverterUtils.getYarnUrlFromURI(
-        new URI(System.getenv("SPARK_YARN_LOG4J_PATH"))))
-      log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong)
-      log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong)
-      localResources("log4j.properties") = log4jConfResource
-    }
-
     if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
       val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
       val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
       val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+      val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
       for( i <- 0 to distFiles.length - 1) {
         setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
-          fileSizes(i))
+          fileSizes(i), visibilities(i))
       }
     }
 
@@ -203,9 +174,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
       val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
       val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
       val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+      val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
       for( i <- 0 to distArchives.length - 1) {
         setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, 
-          timeStamps(i), fileSizes(i))
+          timeStamps(i), fileSizes(i), visibilities(i))
       }
     }
     
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
new file mode 100644
index 0000000000..c0a2af0c6f
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI;
+
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito.when
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
+
+class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
+
+  class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
+    override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): 
+        LocalResourceVisibility = {
+      return LocalResourceVisibility.PRIVATE
+    }
+  }
+  
+  test("test getFileStatus empty") {
+    val distMgr = new ClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val uri = new URI("/tmp/testing")
+    when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    val stat = distMgr.getFileStatus(fs, uri, statCache)
+    assert(stat.getPath() === null)
+  }
+
+  test("test getFileStatus cached") {
+    val distMgr = new ClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val uri = new URI("/tmp/testing")
+    val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner", 
+      null, new Path("/tmp/testing"))
+    when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
+    val stat = distMgr.getFileStatus(fs, uri, statCache)
+    assert(stat.getPath().toString() === "/tmp/testing")
+  }
+
+  test("test addResource") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
+
+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link", 
+      statCache, false)
+    val resource = localResources("link")
+    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+    assert(resource.getTimestamp() === 0)
+    assert(resource.getSize() === 0)
+    assert(resource.getType() === LocalResourceType.FILE)
+
+    val env = new HashMap[String, String]()
+    distMgr.setDistFilesEnv(env)
+    assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
+    assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
+    assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
+    assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
+
+    distMgr.setDistArchivesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+
+    //add another one and verify both there and order correct
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+      null, new Path("/tmp/testing2"))
+    val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
+    when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
+    distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2", 
+      statCache, false)
+    val resource2 = localResources("link2")
+    assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
+    assert(resource2.getTimestamp() === 10)
+    assert(resource2.getSize() === 20)
+    assert(resource2.getType() === LocalResourceType.FILE)
+
+    val env2 = new HashMap[String, String]()
+    distMgr.setDistFilesEnv(env2)
+    val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+    val files = env2("SPARK_YARN_CACHE_FILES").split(',') 
+    val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+    val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
+    assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
+    assert(timestamps(0)  === "0")
+    assert(sizes(0)  === "0")
+    assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
+
+    assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
+    assert(timestamps(1)  === "10")
+    assert(sizes(1)  === "20")
+    assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
+  }
+
+  test("test addResource link null") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
+
+    intercept[Exception] {
+      distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null, 
+        statCache, false)
+    }
+    assert(localResources.get("link") === None)
+    assert(localResources.size === 0)
+  }
+
+  test("test addResource appmaster only") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+      null, new Path("/tmp/testing"))
+    when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
+
+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", 
+      statCache, true)
+    val resource = localResources("link")
+    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+    assert(resource.getTimestamp() === 10)
+    assert(resource.getSize() === 20)
+    assert(resource.getType() === LocalResourceType.ARCHIVE)
+
+    val env = new HashMap[String, String]()
+    distMgr.setDistFilesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_FILES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+
+    distMgr.setDistArchivesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+  }
+
+  test("test addResource archive") {
+    val distMgr = new MockClientDistributedCacheManager()
+    val fs = mock[FileSystem]
+    val conf = new Configuration()
+    val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+    val localResources = HashMap[String, LocalResource]()
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", 
+      null, new Path("/tmp/testing"))
+    when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
+
+    distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", 
+      statCache, false)
+    val resource = localResources("link")
+    assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+    assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+    assert(resource.getTimestamp() === 10)
+    assert(resource.getSize() === 20)
+    assert(resource.getType() === LocalResourceType.ARCHIVE)
+
+    val env = new HashMap[String, String]()
+
+    distMgr.setDistArchivesEnv(env)
+    assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
+    assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
+    assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
+    assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
+
+    distMgr.setDistFilesEnv(env)
+    assert(env.get("SPARK_YARN_CACHE_FILES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
+    assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+  }
+
+
+}
-- 
GitLab