diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c9bc01cba510d4880e02a2de4fd8ee400f658c99..158197ae4d02239cf3afb4da01a7d017118a154e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -683,7 +683,7 @@ class SparkContext( /** * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported - * filesystems), or an HTTP, HTTPS or FTP URI. + * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ def addJar(path: String) { if (path == null) { @@ -696,6 +696,7 @@ class SparkContext( } else { val uri = new URI(path) key = uri.getScheme match { + // A JAR file which exists only on the driver node case null | "file" => if (env.hadoop.isYarnMode()) { // In order for this to work on yarn the user must specify the --addjars option to @@ -713,6 +714,9 @@ class SparkContext( } else { env.httpFileServer.addJar(new File(uri.getPath)) } + // A JAR file which exists locally on every worker node + case "local" => + "file:" + uri.getPath case _ => path } diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index 35d1d41af175b419f1fa18cbff60cdc5bf98eeff..c210dd5c3b4e2093aba8b1fcb522dc173041b750 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -120,4 +120,20 @@ class FileServerSuite extends FunSuite with LocalSparkContext { }.collect() assert(result.toSet === Set((1,2), (2,7), (3,121))) } + + test ("Dynamically adding JARS on a standalone cluster using local: URL") { + sc = new SparkContext("local-cluster[1,1,512]", "test") + val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() + sc.addJar(sampleJarFile.replace("file", "local")) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) + val result = sc.parallelize(testData).reduceByKey { (x,y) => + val fac = Thread.currentThread.getContextClassLoader() + .loadClass("org.uncommons.maths.Maths") + .getDeclaredMethod("factorial", classOf[Int]) + val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt + val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt + a + b + }.collect() + assert(result.toSet === Set((1,2), (2,7), (3,121))) + } }