diff --git a/R/install-dev.sh b/R/install-dev.sh index 4972bb9217072a58acb7525008cb31253345c23c..59d98c9c7a6467832f0dbf73da0dc330510407a6 100755 --- a/R/install-dev.sh +++ b/R/install-dev.sh @@ -42,8 +42,4 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo # Install SparkR to $LIB_DIR R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/ -# Zip the SparkR package so that it can be distributed to worker nodes on YARN -cd $LIB_DIR -jar cfM "$LIB_DIR/sparkr.zip" SparkR - popd > /dev/null diff --git a/R/pkg/inst/tests/packageInAJarTest.R b/R/pkg/inst/tests/packageInAJarTest.R new file mode 100644 index 0000000000000000000000000000000000000000..207a37a0cb47f3f82f54582d5bde87cd0b555de8 --- /dev/null +++ b/R/pkg/inst/tests/packageInAJarTest.R @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +library(SparkR) +library(sparkPackageTest) + +sc <- sparkR.init() + +run1 <- myfunc(5L) + +run2 <- myfunc(-4L) + +sparkR.stop() + +if(run1 != 6) quit(save = "no", status = 1) + +if(run2 != -3) quit(save = "no", status = 1) diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index d53abd3408c5506959a63de443acf5da52dfac91..93b3bea578676f6e47178899a8f72186b3367a95 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.api.r import java.io.File +import scala.collection.JavaConversions._ + import org.apache.spark.{SparkEnv, SparkException} private[spark] object RUtils { @@ -26,7 +28,7 @@ private[spark] object RUtils { * Get the SparkR package path in the local spark distribution. */ def localSparkRPackagePath: Option[String] = { - val sparkHome = sys.env.get("SPARK_HOME") + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.test.home")) sparkHome.map( Seq(_, "R", "lib").mkString(File.separator) ) @@ -46,8 +48,8 @@ private[spark] object RUtils { (sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode")) } - val isYarnCluster = master.contains("yarn") && deployMode == "cluster" - val isYarnClient = master.contains("yarn") && deployMode == "client" + val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster" + val isYarnClient = master != null && master.contains("yarn") && deployMode == "client" // In YARN mode, the SparkR package is distributed as an archive symbolically // linked to the "sparkr" file in the current directory. Note that this does not apply @@ -62,4 +64,10 @@ private[spark] object RUtils { } } } + + /** Check if R is installed before running tests that use R commands. */ + def isRInstalled: Boolean = { + val builder = new ProcessBuilder(Seq("R", "--version")) + builder.start().waitFor() == 0 + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala new file mode 100644 index 0000000000000000000000000000000000000000..ed1e972955679cb44cf2bede7a18856b33942f30 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io._ +import java.util.jar.JarFile +import java.util.logging.Level +import java.util.zip.{ZipEntry, ZipOutputStream} + +import scala.collection.JavaConversions._ + +import com.google.common.io.{ByteStreams, Files} + +import org.apache.spark.{SparkException, Logging} +import org.apache.spark.api.r.RUtils +import org.apache.spark.util.{RedirectThread, Utils} + +private[deploy] object RPackageUtils extends Logging { + + /** The key in the MANIFEST.mf that we look for, in case a jar contains R code. */ + private final val hasRPackage = "Spark-HasRPackage" + + /** Base of the shell command used in order to install R packages. */ + private final val baseInstallCmd = Seq("R", "CMD", "INSTALL", "-l") + + /** R source code should exist under R/pkg in a jar. */ + private final val RJarEntries = "R/pkg" + + /** Documentation on how the R source file layout should be in the jar. */ + private[deploy] final val RJarDoc = + s"""In order for Spark to build R packages that are parts of Spark Packages, there are a few + |requirements. The R source code must be shipped in a jar, with additional Java/Scala + |classes. The jar must be in the following format: + | 1- The Manifest (META-INF/MANIFEST.mf) must contain the key-value: $hasRPackage: true + | 2- The standard R package layout must be preserved under R/pkg/ inside the jar. More + | information on the standard R package layout can be found in: + | http://cran.r-project.org/doc/contrib/Leisch-CreatingPackages.pdf + | An example layout is given below. After running `jar tf $$JAR_FILE | sort`: + | + |META-INF/MANIFEST.MF + |R/ + |R/pkg/ + |R/pkg/DESCRIPTION + |R/pkg/NAMESPACE + |R/pkg/R/ + |R/pkg/R/myRcode.R + |org/ + |org/apache/ + |... + """.stripMargin.trim + + /** Internal method for logging. We log to a printStream in tests, for debugging purposes. */ + private def print( + msg: String, + printStream: PrintStream, + level: Level = Level.FINE, + e: Throwable = null): Unit = { + if (printStream != null) { + // scalastyle:off println + printStream.println(msg) + // scalastyle:on println + if (e != null) { + e.printStackTrace(printStream) + } + } else { + level match { + case Level.INFO => logInfo(msg) + case Level.WARNING => logWarning(msg) + case Level.SEVERE => logError(msg, e) + case _ => logDebug(msg) + } + } + } + + /** + * Checks the manifest of the Jar whether there is any R source code bundled with it. + * Exposed for testing. + */ + private[deploy] def checkManifestForR(jar: JarFile): Boolean = { + val manifest = jar.getManifest.getMainAttributes + manifest.getValue(hasRPackage) != null && manifest.getValue(hasRPackage).trim == "true" + } + + /** + * Runs the standard R package installation code to build the R package from source. + * Multiple runs don't cause problems. + */ + private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = { + // this code should be always running on the driver. + val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse( + throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")) + val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator) + val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg) + if (verbose) { + print(s"Building R package with the command: $installCmd", printStream) + } + try { + val builder = new ProcessBuilder(installCmd) + builder.redirectErrorStream(true) + val env = builder.environment() + env.clear() + val process = builder.start() + new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start() + process.waitFor() == 0 + } catch { + case e: Throwable => + print("Failed to build R package.", printStream, Level.SEVERE, e) + false + } + } + + /** + * Extracts the files under /R in the jar to a temporary directory for building. + */ + private def extractRFolder(jar: JarFile, printStream: PrintStream, verbose: Boolean): File = { + val tempDir = Utils.createTempDir(null) + val jarEntries = jar.entries() + while (jarEntries.hasMoreElements) { + val entry = jarEntries.nextElement() + val entryRIndex = entry.getName.indexOf(RJarEntries) + if (entryRIndex > -1) { + val entryPath = entry.getName.substring(entryRIndex) + if (entry.isDirectory) { + val dir = new File(tempDir, entryPath) + if (verbose) { + print(s"Creating directory: $dir", printStream) + } + dir.mkdirs + } else { + val inStream = jar.getInputStream(entry) + val outPath = new File(tempDir, entryPath) + Files.createParentDirs(outPath) + val outStream = new FileOutputStream(outPath) + if (verbose) { + print(s"Extracting $entry to $outPath", printStream) + } + Utils.copyStream(inStream, outStream, closeStreams = true) + } + } + } + tempDir + } + + /** + * Extracts the files under /R in the jar to a temporary directory for building. + */ + private[deploy] def checkAndBuildRPackage( + jars: String, + printStream: PrintStream = null, + verbose: Boolean = false): Unit = { + jars.split(",").foreach { jarPath => + val file = new File(Utils.resolveURI(jarPath)) + if (file.exists()) { + val jar = new JarFile(file) + if (checkManifestForR(jar)) { + print(s"$file contains R source code. Now installing package.", printStream, Level.INFO) + val rSource = extractRFolder(jar, printStream, verbose) + try { + if (!rPackageBuilder(rSource, printStream, verbose)) { + print(s"ERROR: Failed to build R package in $file.", printStream) + print(RJarDoc, printStream) + } + } finally { + rSource.delete() // clean up + } + } else { + if (verbose) { + print(s"$file doesn't contain R source code, skipping...", printStream) + } + } + } else { + print(s"WARN: $file resolved as dependency, but not found.", printStream, Level.WARNING) + } + } + } + + private def listFilesRecursively(dir: File, excludePatterns: Seq[String]): Set[File] = { + if (!dir.exists()) { + Set.empty[File] + } else { + if (dir.isDirectory) { + val subDir = dir.listFiles(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + !excludePatterns.map(name.contains).reduce(_ || _) // exclude files with given pattern + } + }) + subDir.flatMap(listFilesRecursively(_, excludePatterns)).toSet + } else { + Set(dir) + } + } + } + + /** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */ + private[deploy] def zipRLibraries(dir: File, name: String): File = { + val filesToBundle = listFilesRecursively(dir, Seq(".zip")) + // create a zip file from scratch, do not append to existing file. + val zipFile = new File(dir, name) + zipFile.delete() + val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false)) + try { + filesToBundle.foreach { file => + // get the relative paths for proper naming in the zip file + val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, "") + val fis = new FileInputStream(file) + val zipEntry = new ZipEntry(relPath) + zipOutputStream.putNextEntry(zipEntry) + ByteStreams.copy(fis, zipOutputStream) + zipOutputStream.closeEntry() + fis.close() + } + } finally { + zipOutputStream.close() + } + zipFile + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 31185c8e77def912c8b8fdcb2ac74f4a0bd94566..1186bed4852502e870307ae2680b7f6a1ba7224e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -292,6 +292,12 @@ object SparkSubmit { } } + // install any R packages that may have been passed through --jars or --packages. + // Spark Packages may contain R source code inside the jar. + if (args.isR && !StringUtils.isBlank(args.jars)) { + RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) + } + // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local if (args.isPython && !isYarnCluster) { @@ -361,7 +367,8 @@ object SparkSubmit { if (rPackagePath.isEmpty) { printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.") } - val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE) + val rPackageFile = + RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE) if (!rPackageFile.exists()) { printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.") } @@ -987,11 +994,9 @@ private[spark] object SparkSubmitUtils { addExclusionRules(ivySettings, ivyConfName, md) // add all supplied maven artifacts as dependencies addDependenciesToIvy(md, artifacts, ivyConfName) - exclusions.foreach { e => md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName)) } - // resolve dependencies val rr: ResolveReport = ivy.resolve(md, resolveOptions) if (rr.hasError) { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 44852ce4e84ac76103ebfe3f2e225bad0e129f59..3f3c6627c21fbcc5bc8f651715119f7485b28833 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -611,5 +611,4 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S System.setErr(currentErr) } } - } diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala index 823050b0aabbecf04c51705ecb1a03189f92a335..d93febcfd23fd177580c41c9f96e6134377354a2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala @@ -19,6 +19,10 @@ package org.apache.spark.deploy import java.io.{File, FileInputStream, FileOutputStream} import java.util.jar.{JarEntry, JarOutputStream} +import java.util.jar.Attributes.Name +import java.util.jar.Manifest + +import scala.collection.mutable.ArrayBuffer import com.google.common.io.{Files, ByteStreams} @@ -35,7 +39,7 @@ private[deploy] object IvyTestUtils { * Create the path for the jar and pom from the maven coordinate. Extension should be `jar` * or `pom`. */ - private def pathFromCoordinate( + private[deploy] def pathFromCoordinate( artifact: MavenCoordinate, prefix: File, ext: String, @@ -52,7 +56,7 @@ private[deploy] object IvyTestUtils { } /** Returns the artifact naming based on standard ivy or maven format. */ - private def artifactName( + private[deploy] def artifactName( artifact: MavenCoordinate, useIvyLayout: Boolean, ext: String = ".jar"): String = { @@ -73,7 +77,7 @@ private[deploy] object IvyTestUtils { } /** Write the contents to a file to the supplied directory. */ - private def writeFile(dir: File, fileName: String, contents: String): File = { + private[deploy] def writeFile(dir: File, fileName: String, contents: String): File = { val outputFile = new File(dir, fileName) val outputStream = new FileOutputStream(outputFile) outputStream.write(contents.toCharArray.map(_.toByte)) @@ -90,6 +94,42 @@ private[deploy] object IvyTestUtils { writeFile(dir, "mylib.py", contents) } + /** Create an example R package that calls the given Java class. */ + private def createRFiles( + dir: File, + className: String, + packageName: String): Seq[(String, File)] = { + val rFilesDir = new File(dir, "R" + File.separator + "pkg") + Files.createParentDirs(new File(rFilesDir, "R" + File.separator + "mylib.R")) + val contents = + s"""myfunc <- function(x) { + | SparkR:::callJStatic("$packageName.$className", "myFunc", x) + |} + """.stripMargin + val source = writeFile(new File(rFilesDir, "R"), "mylib.R", contents) + val description = + """Package: sparkPackageTest + |Type: Package + |Title: Test for building an R package + |Version: 0.1 + |Date: 2015-07-08 + |Author: Burak Yavuz + |Imports: methods, SparkR + |Depends: R (>= 3.1), methods, SparkR + |Suggests: testthat + |Description: Test for building an R package within a jar + |License: Apache License (== 2.0) + |Collate: 'mylib.R' + """.stripMargin + val descFile = writeFile(rFilesDir, "DESCRIPTION", description) + val namespace = + """import(SparkR) + |export("myfunc") + """.stripMargin + val nameFile = writeFile(rFilesDir, "NAMESPACE", namespace) + Seq(("R/pkg/R/mylib.R", source), ("R/pkg/DESCRIPTION", descFile), ("R/pkg/NAMESPACE", nameFile)) + } + /** Create a simple testable Class. */ private def createJavaClass(dir: File, className: String, packageName: String): File = { val contents = @@ -97,17 +137,14 @@ private[deploy] object IvyTestUtils { | |import java.lang.Integer; | - |class $className implements java.io.Serializable { - | - | public $className() {} - | - | public Integer myFunc(Integer x) { + |public class $className implements java.io.Serializable { + | public static Integer myFunc(Integer x) { | return x + 1; | } |} """.stripMargin val sourceFile = - new JavaSourceFromString(new File(dir, className + ".java").getAbsolutePath, contents) + new JavaSourceFromString(new File(dir, className).getAbsolutePath, contents) createCompiledClass(className, dir, sourceFile, Seq.empty) } @@ -199,14 +236,25 @@ private[deploy] object IvyTestUtils { } /** Create the jar for the given maven coordinate, using the supplied files. */ - private def packJar( + private[deploy] def packJar( dir: File, artifact: MavenCoordinate, files: Seq[(String, File)], - useIvyLayout: Boolean): File = { + useIvyLayout: Boolean, + withR: Boolean, + withManifest: Option[Manifest] = None): File = { val jarFile = new File(dir, artifactName(artifact, useIvyLayout)) val jarFileStream = new FileOutputStream(jarFile) - val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest()) + val manifest = withManifest.getOrElse { + val mani = new Manifest() + if (withR) { + val attr = mani.getMainAttributes + attr.put(Name.MANIFEST_VERSION, "1.0") + attr.put(new Name("Spark-HasRPackage"), "true") + } + mani + } + val jarStream = new JarOutputStream(jarFileStream, manifest) for (file <- files) { val jarEntry = new JarEntry(file._1) @@ -239,7 +287,8 @@ private[deploy] object IvyTestUtils { dependencies: Option[Seq[MavenCoordinate]] = None, tempDir: Option[File] = None, useIvyLayout: Boolean = false, - withPython: Boolean = false): File = { + withPython: Boolean = false, + withR: Boolean = false): File = { // Where the root of the repository exists, and what Ivy will search in val tempPath = tempDir.getOrElse(Files.createTempDir()) // Create directory if it doesn't exist @@ -255,14 +304,16 @@ private[deploy] object IvyTestUtils { val javaClass = createJavaClass(root, className, artifact.groupId) // A tuple of files representation in the jar, and the file val javaFile = (artifact.groupId.replace(".", "/") + "/" + javaClass.getName, javaClass) - val allFiles = - if (withPython) { - val pythonFile = createPythonFile(root) - Seq(javaFile, (pythonFile.getName, pythonFile)) - } else { - Seq(javaFile) - } - val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout) + val allFiles = ArrayBuffer[(String, File)](javaFile) + if (withPython) { + val pythonFile = createPythonFile(root) + allFiles.append((pythonFile.getName, pythonFile)) + } + if (withR) { + val rFiles = createRFiles(root, className, artifact.groupId) + allFiles.append(rFiles: _*) + } + val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout, withR) assert(jarFile.exists(), "Problem creating Jar file") val descriptor = createDescriptor(tempPath, artifact, dependencies, useIvyLayout) assert(descriptor.exists(), "Problem creating Pom file") @@ -286,9 +337,10 @@ private[deploy] object IvyTestUtils { dependencies: Option[String], rootDir: Option[File], useIvyLayout: Boolean = false, - withPython: Boolean = false): File = { + withPython: Boolean = false, + withR: Boolean = false): File = { val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates) - val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython) + val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython, withR) deps.foreach { seq => seq.foreach { dep => createLocalRepository(dep, None, Some(mainRepo), useIvyLayout, withPython = false) }} @@ -311,11 +363,12 @@ private[deploy] object IvyTestUtils { rootDir: Option[File], useIvyLayout: Boolean = false, withPython: Boolean = false, + withR: Boolean = false, ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = { val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates) purgeLocalIvyCache(artifact, deps, ivySettings) val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout, - withPython) + withPython, withR) try { f(repo.toURI.toString) } finally { diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..47a64081e297e79831cb97faee4b6987d29cf5d8 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.{PrintStream, OutputStream, File} +import java.net.URI +import java.util.jar.Attributes.Name +import java.util.jar.{JarFile, Manifest} +import java.util.zip.{ZipEntry, ZipFile} + +import org.scalatest.BeforeAndAfterEach +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +import com.google.common.io.Files +import org.apache.commons.io.FileUtils + +import org.apache.spark.SparkFunSuite +import org.apache.spark.api.r.RUtils +import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate + +class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { + + private val main = MavenCoordinate("a", "b", "c") + private val dep1 = MavenCoordinate("a", "dep1", "c") + private val dep2 = MavenCoordinate("a", "dep2", "d") + + private def getJarPath(coord: MavenCoordinate, repo: File): File = { + new File(IvyTestUtils.pathFromCoordinate(coord, repo, "jar", useIvyLayout = false), + IvyTestUtils.artifactName(coord, useIvyLayout = false, ".jar")) + } + + private val lineBuffer = ArrayBuffer[String]() + + private val noOpOutputStream = new OutputStream { + def write(b: Int) = {} + } + + /** Simple PrintStream that reads data into a buffer */ + private class BufferPrintStream extends PrintStream(noOpOutputStream) { + // scalastyle:off println + override def println(line: String) { + // scalastyle:on println + lineBuffer += line + } + } + + def beforeAll() { + System.setProperty("spark.testing", "true") + } + + override def beforeEach(): Unit = { + lineBuffer.clear() + } + + test("pick which jars to unpack using the manifest") { + val deps = Seq(dep1, dep2).mkString(",") + IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo => + val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo))))) + assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code") + assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code") + assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code") + } + } + + test("build an R package from a jar end to end") { + assume(RUtils.isRInstalled, "R isn't installed on this machine.") + val deps = Seq(dep1, dep2).mkString(",") + IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo => + val jars = Seq(main, dep1, dep2).map { c => + getJarPath(c, new File(new URI(repo))) + }.mkString(",") + RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose = true) + val firstJar = jars.substring(0, jars.indexOf(",")) + val output = lineBuffer.mkString("\n") + assert(output.contains("Building R package")) + assert(output.contains("Extracting")) + assert(output.contains(s"$firstJar contains R source code. Now installing package.")) + assert(output.contains("doesn't contain R source code, skipping...")) + } + } + + test("jars that don't exist are skipped and print warning") { + assume(RUtils.isRInstalled, "R isn't installed on this machine.") + val deps = Seq(dep1, dep2).mkString(",") + IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo => + val jars = Seq(main, dep1, dep2).map { c => + getJarPath(c, new File(new URI(repo))) + "dummy" + }.mkString(",") + RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose = true) + val individualJars = jars.split(",") + val output = lineBuffer.mkString("\n") + individualJars.foreach { jarFile => + assert(output.contains(s"$jarFile")) + } + } + } + + test("faulty R package shows documentation") { + assume(RUtils.isRInstalled, "R isn't installed on this machine.") + IvyTestUtils.withRepository(main, None, None) { repo => + val manifest = new Manifest + val attr = manifest.getMainAttributes + attr.put(Name.MANIFEST_VERSION, "1.0") + attr.put(new Name("Spark-HasRPackage"), "true") + val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil, + useIvyLayout = false, withR = false, Some(manifest)) + RPackageUtils.checkAndBuildRPackage(jar.getAbsolutePath, new BufferPrintStream, + verbose = true) + val output = lineBuffer.mkString("\n") + assert(output.contains(RPackageUtils.RJarDoc)) + } + } + + test("SparkR zipping works properly") { + val tempDir = Files.createTempDir() + try { + IvyTestUtils.writeFile(tempDir, "test.R", "abc") + val fakeSparkRDir = new File(tempDir, "SparkR") + assert(fakeSparkRDir.mkdirs()) + IvyTestUtils.writeFile(fakeSparkRDir, "abc.R", "abc") + IvyTestUtils.writeFile(fakeSparkRDir, "DESCRIPTION", "abc") + IvyTestUtils.writeFile(tempDir, "package.zip", "abc") // fake zip file :) + val fakePackageDir = new File(tempDir, "packageTest") + assert(fakePackageDir.mkdirs()) + IvyTestUtils.writeFile(fakePackageDir, "def.R", "abc") + IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc") + val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip") + assert(finalZip.exists()) + val entries = new ZipFile(finalZip).entries().toSeq.map(_.getName) + assert(entries.contains("/test.R")) + assert(entries.contains("/SparkR/abc.R")) + assert(entries.contains("/SparkR/DESCRIPTION")) + assert(!entries.contains("/package.zip")) + assert(entries.contains("/packageTest/def.R")) + assert(entries.contains("/packageTest/DESCRIPTION")) + } finally { + FileUtils.deleteDirectory(tempDir) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index aa78bfe30974cd867ec81c9ca1336a3e2bde94b9..757e0ce3d278bf61ddac290e0ccc49366a776780 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -362,6 +362,30 @@ class SparkSubmitSuite } } + test("correctly builds R packages included in a jar with --packages") { + // TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins. + // It's hard to write the test in SparkR (because we can't create the repository dynamically) + /* + assume(RUtils.isRInstalled, "R isn't installed on this machine.") + val main = MavenCoordinate("my.great.lib", "mylib", "0.1") + val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) + val rScriptDir = + Seq(sparkHome, "R", "pkg", "inst", "tests", "packageInAJarTest.R").mkString(File.separator) + assert(new File(rScriptDir).exists) + IvyTestUtils.withRepository(main, None, None, withR = true) { repo => + val args = Seq( + "--name", "testApp", + "--master", "local-cluster[2,1,1024]", + "--packages", main.toString, + "--repositories", repo, + "--verbose", + "--conf", "spark.ui.enabled=false", + rScriptDir) + runSparkSubmit(args) + } + */ + } + test("resolves command line argument paths correctly") { val jars = "/jar1,/jar2" // --jars val files = "hdfs:/file1,file2" // --files