From 2398e3d69c9a675d651c192107953de8e6c2aecd Mon Sep 17 00:00:00 2001 From: jerryshao <sshao@hortonworks.com> Date: Thu, 28 Apr 2016 16:39:49 -0700 Subject: [PATCH] [SPARK-14836][YARN] Zip all the jars before uploading to distributed cache ## What changes were proposed in this pull request? <copy form JIRA> Currently if neither `spark.yarn.jars` nor `spark.yarn.archive` is set (by default), Spark on yarn code will upload all the jars in the folder separately into distributed cache, this is quite time consuming, and very verbose, instead of upload jars separately into distributed cache, here changes to zip all the jars first, and then put into distributed cache. This will significantly improve the speed of starting time. ## How was this patch tested? Unit test and local integrated test is done. Verified with SparkPi both in spark cluster and client mode. Author: jerryshao <sshao@hortonworks.com> Closes #12597 from jerryshao/SPARK-14836. --- .../org/apache/spark/deploy/yarn/Client.scala | 21 ++++++++++++++++--- .../spark/deploy/yarn/ClientSuite.scala | 11 +++++----- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6184ad591c..b494ef0dd9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -496,11 +496,26 @@ private[spark] class Client( "to uploading libraries under SPARK_HOME.") val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir( sparkConf.getenv("SPARK_HOME"))) - jarsDir.listFiles().foreach { f => - if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) { - distribute(f.getAbsolutePath(), targetDir = Some(LOCALIZED_LIB_DIR)) + val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip", + new File(Utils.getLocalDir(sparkConf))) + val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive)) + + try { + jarsStream.setLevel(0) + jarsDir.listFiles().foreach { f => + if (f.isFile && f.getName.toLowerCase().endsWith(".jar") && f.canRead) { + jarsStream.putNextEntry(new ZipEntry(f.getName)) + Files.copy(f, jarsStream) + jarsStream.closeEntry() + } } + } finally { + jarsStream.close() } + + distribute(jarsArchive.toURI.getPath, + resType = LocalResourceType.ARCHIVE, + destName = Some(LOCALIZED_LIB_DIR)) } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index f196a0d8ca..a408c48d1d 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -285,8 +285,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath())) val client = createClient(sparkConf) client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(), - anyBoolean(), any()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) } @@ -295,13 +293,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val jarsDir = new File(libs, "jars") assert(jarsDir.mkdir()) new FileOutputStream(new File(libs, "RELEASE")).close() - val userLibs = Utils.createTempDir() + val userLib1 = Utils.createTempDir() + val userLib2 = Utils.createTempDir() val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir) - val jar2 = TestUtils.createJarWithFiles(Map(), userLibs) + val jar2 = TestUtils.createJarWithFiles(Map(), userLib1) // Copy jar2 to jar3 with same name val jar3 = { - val target = new File(userLibs, new File(jar1.toURI).getName) + val target = new File(userLib2, new File(jar2.toURI).getName) val input = new FileInputStream(jar2.getPath) val output = new FileOutputStream(target) Utils.copyStream(input, output, closeStreams = true) @@ -315,7 +314,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val tempDir = Utils.createTempDir() client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) - // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be + // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar2 will be // ignored. sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName))) } -- GitLab