Skip to content
Snippets Groups Projects
Commit 1633d0a2 authored by Burak Yavuz's avatar Burak Yavuz Committed by Marcelo Vanzin
Browse files

[SPARK-9263] Added flags to exclude dependencies when using --packages

While the functionality is there to exclude packages, there are no flags that allow users to exclude dependencies, in case of dependency conflicts. We should provide users with a flag to add dependency exclusions in case the packages are not resolved properly (or not available due to licensing).

The flag I added was --packages-exclude, but I'm open on renaming it. I also added property flags in case people would like to use a conf file to provide dependencies, which is possible if there is a long list of dependencies or exclusions.

cc andrewor14 vanzin pwendell

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #7599 from brkyvz/packages-exclusions and squashes the following commits:

636f410 [Burak Yavuz] addressed nits
6e54ede [Burak Yavuz] is this the culprit
b5e508e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into packages-exclusions
154f5db [Burak Yavuz] addressed initial comments
1536d7a [Burak Yavuz] Added flags to exclude packages using --packages-exclude
parent b79b4f5f
No related branches found
No related tags found
No related merge requests found
...@@ -24,6 +24,7 @@ import java.security.PrivilegedExceptionAction ...@@ -24,6 +24,7 @@ import java.security.PrivilegedExceptionAction
import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation
import org.apache.ivy.Ivy import org.apache.ivy.Ivy
...@@ -37,6 +38,7 @@ import org.apache.ivy.core.settings.IvySettings ...@@ -37,6 +38,7 @@ import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver} import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
import org.apache.spark.api.r.RUtils import org.apache.spark.api.r.RUtils
import org.apache.spark.SPARK_VERSION import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._ import org.apache.spark.deploy.rest._
...@@ -275,21 +277,18 @@ object SparkSubmit { ...@@ -275,21 +277,18 @@ object SparkSubmit {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code // too for packages that include Python code
val resolvedMavenCoordinates = val exclusions: Seq[String] =
SparkSubmitUtils.resolveMavenCoordinates( if (!StringUtils.isBlank(args.packagesExclusions)) {
args.packages, Option(args.repositories), Option(args.ivyRepoPath)) args.packagesExclusions.split(",")
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
args.jars = resolvedMavenCoordinates
} else { } else {
args.jars += s",$resolvedMavenCoordinates" Nil
} }
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
Some(args.repositories), Some(args.ivyRepoPath), exclusions = exclusions)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) { if (args.isPython) {
if (args.pyFiles == null || args.pyFiles.trim.isEmpty) { args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
args.pyFiles = resolvedMavenCoordinates
} else {
args.pyFiles += s",$resolvedMavenCoordinates"
}
} }
} }
...@@ -736,7 +735,7 @@ object SparkSubmit { ...@@ -736,7 +735,7 @@ object SparkSubmit {
* no files, into a single comma-separated string. * no files, into a single comma-separated string.
*/ */
private def mergeFileLists(lists: String*): String = { private def mergeFileLists(lists: String*): String = {
val merged = lists.filter(_ != null) val merged = lists.filterNot(StringUtils.isBlank)
.flatMap(_.split(",")) .flatMap(_.split(","))
.mkString(",") .mkString(",")
if (merged == "") null else merged if (merged == "") null else merged
...@@ -938,7 +937,7 @@ private[spark] object SparkSubmitUtils { ...@@ -938,7 +937,7 @@ private[spark] object SparkSubmitUtils {
// are supplied to spark-submit // are supplied to spark-submit
val alternateIvyCache = ivyPath.getOrElse("") val alternateIvyCache = ivyPath.getOrElse("")
val packagesDirectory: File = val packagesDirectory: File =
if (alternateIvyCache.trim.isEmpty) { if (alternateIvyCache == null || alternateIvyCache.trim.isEmpty) {
new File(ivySettings.getDefaultIvyUserDir, "jars") new File(ivySettings.getDefaultIvyUserDir, "jars")
} else { } else {
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache)) ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
...@@ -1010,7 +1009,7 @@ private[spark] object SparkSubmitUtils { ...@@ -1010,7 +1009,7 @@ private[spark] object SparkSubmitUtils {
} }
} }
private def createExclusion( private[deploy] def createExclusion(
coords: String, coords: String,
ivySettings: IvySettings, ivySettings: IvySettings,
ivyConfName: String): ExcludeRule = { ivyConfName: String): ExcludeRule = {
......
...@@ -59,6 +59,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S ...@@ -59,6 +59,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var packages: String = null var packages: String = null
var repositories: String = null var repositories: String = null
var ivyRepoPath: String = null var ivyRepoPath: String = null
var packagesExclusions: String = null
var verbose: Boolean = false var verbose: Boolean = false
var isPython: Boolean = false var isPython: Boolean = false
var pyFiles: String = null var pyFiles: String = null
...@@ -172,6 +173,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S ...@@ -172,6 +173,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
packagesExclusions = Option(packagesExclusions)
.orElse(sparkProperties.get("spark.jars.excludes")).orNull
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
numExecutors = Option(numExecutors) numExecutors = Option(numExecutors)
.getOrElse(sparkProperties.get("spark.executor.instances").orNull) .getOrElse(sparkProperties.get("spark.executor.instances").orNull)
...@@ -299,6 +303,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S ...@@ -299,6 +303,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| childArgs [${childArgs.mkString(" ")}] | childArgs [${childArgs.mkString(" ")}]
| jars $jars | jars $jars
| packages $packages | packages $packages
| packagesExclusions $packagesExclusions
| repositories $repositories | repositories $repositories
| verbose $verbose | verbose $verbose
| |
...@@ -391,6 +396,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S ...@@ -391,6 +396,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case PACKAGES => case PACKAGES =>
packages = value packages = value
case PACKAGES_EXCLUDE =>
packagesExclusions = value
case REPOSITORIES => case REPOSITORIES =>
repositories = value repositories = value
...@@ -482,6 +490,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S ...@@ -482,6 +490,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| maven repo, then maven central and any additional remote | maven repo, then maven central and any additional remote
| repositories given by --repositories. The format for the | repositories given by --repositories. The format for the
| coordinates should be groupId:artifactId:version. | coordinates should be groupId:artifactId:version.
| --exclude-packages Comma-separated list of groupId:artifactId, to exclude while
| resolving the dependencies provided in --packages to avoid
| dependency conflicts.
| --repositories Comma-separated list of additional remote repositories to | --repositories Comma-separated list of additional remote repositories to
| search for the maven coordinates given with --packages. | search for the maven coordinates given with --packages.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
......
...@@ -95,6 +95,25 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -95,6 +95,25 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(md.getDependencies.length === 2) assert(md.getDependencies.length === 2)
} }
test("excludes works correctly") {
val md = SparkSubmitUtils.getModuleDescriptor
val excludes = Seq("a:b", "c:d")
excludes.foreach { e =>
md.addExcludeRule(SparkSubmitUtils.createExclusion(e + ":*", new IvySettings, "default"))
}
val rules = md.getAllExcludeRules
assert(rules.length === 2)
val rule1 = rules(0).getId.getModuleId
assert(rule1.getOrganisation === "a")
assert(rule1.getName === "b")
val rule2 = rules(1).getId.getModuleId
assert(rule2.getOrganisation === "c")
assert(rule2.getName === "d")
intercept[IllegalArgumentException] {
SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default")
}
}
test("ivy path works correctly") { test("ivy path works correctly") {
val md = SparkSubmitUtils.getModuleDescriptor val md = SparkSubmitUtils.getModuleDescriptor
val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar") val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar")
...@@ -168,4 +187,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -168,4 +187,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
} }
} }
test("exclude dependencies end to end") {
val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
val dep = "my.great.dep:mydep:0.5"
IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(main.toString,
Some(repo), None, Seq("my.great.dep:mydep"), isTest = true)
assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact")
}
}
} }
...@@ -51,6 +51,7 @@ class SparkSubmitOptionParser { ...@@ -51,6 +51,7 @@ class SparkSubmitOptionParser {
protected final String MASTER = "--master"; protected final String MASTER = "--master";
protected final String NAME = "--name"; protected final String NAME = "--name";
protected final String PACKAGES = "--packages"; protected final String PACKAGES = "--packages";
protected final String PACKAGES_EXCLUDE = "--exclude-packages";
protected final String PROPERTIES_FILE = "--properties-file"; protected final String PROPERTIES_FILE = "--properties-file";
protected final String PROXY_USER = "--proxy-user"; protected final String PROXY_USER = "--proxy-user";
protected final String PY_FILES = "--py-files"; protected final String PY_FILES = "--py-files";
...@@ -105,6 +106,7 @@ class SparkSubmitOptionParser { ...@@ -105,6 +106,7 @@ class SparkSubmitOptionParser {
{ NAME }, { NAME },
{ NUM_EXECUTORS }, { NUM_EXECUTORS },
{ PACKAGES }, { PACKAGES },
{ PACKAGES_EXCLUDE },
{ PRINCIPAL }, { PRINCIPAL },
{ PROPERTIES_FILE }, { PROPERTIES_FILE },
{ PROXY_USER }, { PROXY_USER },
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment