diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6184ad591c1d2abd78082aa82ddcbad903fb95da..b494ef0dd96658e9234585edf5b8c952a96f4fc4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -496,11 +496,26 @@ private[spark] class Client( "to uploading libraries under SPARK_HOME.") val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir( sparkConf.getenv("SPARK_HOME"))) - jarsDir.listFiles().foreach { f => - if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) { - distribute(f.getAbsolutePath(), targetDir = Some(LOCALIZED_LIB_DIR)) + val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip", + new File(Utils.getLocalDir(sparkConf))) + val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive)) + + try { + jarsStream.setLevel(0) + jarsDir.listFiles().foreach { f => + if (f.isFile && f.getName.toLowerCase().endsWith(".jar") && f.canRead) { + jarsStream.putNextEntry(new ZipEntry(f.getName)) + Files.copy(f, jarsStream) + jarsStream.closeEntry() + } } + } finally { + jarsStream.close() } + + distribute(jarsArchive.toURI.getPath, + resType = LocalResourceType.ARCHIVE, + destName = Some(LOCALIZED_LIB_DIR)) } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index f196a0d8ca32bbacaafba78f8f86b4e5372fc913..a408c48d1d231ea6925b67b327b5d54d5059b748 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -285,8 +285,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath())) val client = createClient(sparkConf) client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(), - anyBoolean(), any()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) } @@ -295,13 +293,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val jarsDir = new File(libs, "jars") assert(jarsDir.mkdir()) new FileOutputStream(new File(libs, "RELEASE")).close() - val userLibs = Utils.createTempDir() + val userLib1 = Utils.createTempDir() + val userLib2 = Utils.createTempDir() val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir) - val jar2 = TestUtils.createJarWithFiles(Map(), userLibs) + val jar2 = TestUtils.createJarWithFiles(Map(), userLib1) // Copy jar2 to jar3 with same name val jar3 = { - val target = new File(userLibs, new File(jar1.toURI).getName) + val target = new File(userLib2, new File(jar2.toURI).getName) val input = new FileInputStream(jar2.getPath) val output = new FileOutputStream(target) Utils.copyStream(input, output, closeStreams = true) @@ -315,7 +314,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val tempDir = Utils.createTempDir() client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) - // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be + // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar2 will be // ignored. sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName))) }