diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 70826ed326ba61260d05d62d876ddbe34076a936..f4f4518480e3635ea364dc3451002acaf7bd22b5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -326,6 +326,7 @@ private[spark] class Client( destDir: Path, srcPath: Path, replication: Short, + symlinkCache: Map[URI, Path], force: Boolean = false, destName: Option[String] = None): Path = { val destFs = destDir.getFileSystem(hadoopConf) @@ -343,8 +344,12 @@ private[spark] class Client( // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific // version shows the specific version in the distributed cache configuration val qualifiedDestPath = destFs.makeQualified(destPath) - val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf) - fc.resolvePath(qualifiedDestPath) + val qualifiedDestDir = qualifiedDestPath.getParent + val resolvedDestDir = symlinkCache.getOrElseUpdate(qualifiedDestDir.toUri(), { + val fc = FileContext.getFileContext(qualifiedDestDir.toUri(), hadoopConf) + fc.resolvePath(qualifiedDestDir) + }) + new Path(resolvedDestDir, qualifiedDestPath.getName()) } /** @@ -400,6 +405,7 @@ private[spark] class Client( FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION)) val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + val symlinkCache: Map[URI, Path] = HashMap[URI, Path]() def addDistributedUri(uri: URI): Boolean = { val uriStr = uri.toString() @@ -445,7 +451,7 @@ private[spark] class Client( val localPath = getQualifiedLocalPath(localURI, hadoopConf) val linkname = targetDir.map(_ + "/").getOrElse("") + destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(destDir, localPath, replication) + val destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache) val destFs = FileSystem.get(destPath.toUri(), hadoopConf) distCacheMgr.addResource( destFs, hadoopConf, destPath, localResources, resType, linkname, statCache, @@ -497,8 +503,9 @@ private[spark] class Client( val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf) val pathFs = FileSystem.get(path.toUri(), hadoopConf) pathFs.globStatus(path).filter(_.isFile()).foreach { entry => - distribute(entry.getPath().toUri().toString(), - targetDir = Some(LOCALIZED_LIB_DIR)) + val uri = entry.getPath().toUri() + statCache.update(uri, entry) + distribute(uri.toString(), targetDir = Some(LOCALIZED_LIB_DIR)) } } else { localJars += jar @@ -614,7 +621,7 @@ private[spark] class Client( sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString()) val localConfArchive = new Path(createConfArchive().toURI()) - copyFileToRemote(destDir, localConfArchive, replication, force = true, + copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, force = true, destName = Some(LOCALIZED_CONF_ARCHIVE)) // Manually add the config archive to the cache manager so that the AM is launched with diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index dcc2288dd155aa742384a3a849fad7fe22e0313d..e6e0ea38ade9433ca418e0094fd3cf34d085938d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -68,7 +68,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging { link: String, statCache: Map[URI, FileStatus], appMasterOnly: Boolean = false): Unit = { - val destStatus = fs.getFileStatus(destPath) + val destStatus = statCache.getOrElse(destPath.toUri(), fs.getFileStatus(destPath)) val amJarRsrc = Records.newRecord(classOf[LocalResource]) amJarRsrc.setType(resourceType) val visibility = getVisibility(conf, destPath.toUri(), statCache) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index dd2180a0f5e1e58cdd29e1a383a3b4e73973e6f3..3a11787aa57dcf687cf7a28d99e08173104f63d0 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -134,7 +134,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll .set("spark.yarn.dist.jars", ADDED) val client = createClient(sparkConf, args = Array("--jar", USER)) doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), anyShort(), anyBoolean(), any()) + any(classOf[Path]), anyShort(), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) val tempDir = Utils.createTempDir() try { @@ -240,11 +240,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*"))) verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort(), - anyBoolean(), any()) + any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort(), - anyBoolean(), any()) + any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort(), - anyBoolean(), any()) + any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) val cp = classpath(client) cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) @@ -262,7 +262,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort(), - anyBoolean(), any()) + any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())