Skip to content
Snippets Groups Projects
Commit 2ca60ace authored by Michael Armbrust's avatar Michael Armbrust
Browse files

[SPARK-7491] [SQL] Allow configuration of classloader isolation for hive

Author: Michael Armbrust <michael@databricks.com>

Closes #6167 from marmbrus/configureIsolation and squashes the following commits:

6147cbe [Michael Armbrust] filter other conf
22cc3bc7 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into configureIsolation
07476ee [Michael Armbrust] filter empty prefixes
dfdf19c [Michael Armbrust] [SPARK-6906][SQL] Allow configuration of classloader isolation for hive
parent 56456287
No related branches found
No related tags found
No related merge requests found
...@@ -122,6 +122,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ...@@ -122,6 +122,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[hive] def hiveMetastoreJars: String = protected[hive] def hiveMetastoreJars: String =
getConf(HIVE_METASTORE_JARS, "builtin") getConf(HIVE_METASTORE_JARS, "builtin")
/**
* A comma separated list of class prefixes that should be loaded using the classloader that
* is shared between Spark SQL and a specific version of Hive. An example of classes that should
* be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
* to be shared are those that interact with classes that are already shared. For example,
* custom appenders that are used by log4j.
*/
protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes)
.split(",").filterNot(_ == "")
private def jdbcPrefixes = Seq(
"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc").mkString(",")
/**
* A comma separated list of class prefixes that should explicitly be reloaded for each version
* of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
* prefix that typically would be shared (i.e. org.apache.spark.*)
*/
protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
getConf("spark.sql.hive.metastore.barrierPrefixes", "")
.split(",").filterNot(_ == "")
@transient @transient
protected[sql] lazy val substitutor = new VariableSubstitution() protected[sql] lazy val substitutor = new VariableSubstitution()
...@@ -179,12 +202,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ...@@ -179,12 +202,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
version = metaVersion, version = metaVersion,
execJars = jars.toSeq, execJars = jars.toSeq,
config = allConfig, config = allConfig,
isolationOn = true) isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") { } else if (hiveMetastoreJars == "maven") {
// TODO: Support for loading the jars from an already downloaded location. // TODO: Support for loading the jars from an already downloaded location.
logInfo( logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig ) IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig)
} else { } else {
// Convert to files and expand any directories. // Convert to files and expand any directories.
val jars = val jars =
...@@ -210,7 +235,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { ...@@ -210,7 +235,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
version = metaVersion, version = metaVersion,
execJars = jars.toSeq, execJars = jars.toSeq,
config = allConfig, config = allConfig,
isolationOn = true) isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} }
isolatedLoader.client isolatedLoader.client
} }
......
...@@ -56,8 +56,7 @@ private[hive] object IsolatedClientLoader { ...@@ -56,8 +56,7 @@ private[hive] object IsolatedClientLoader {
(if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil)) (if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil))
.map(a => s"org.apache.hive:$a:${version.fullVersion}") :+ .map(a => s"org.apache.hive:$a:${version.fullVersion}") :+
"com.google.guava:guava:14.0.1" :+ "com.google.guava:guava:14.0.1" :+
"org.apache.hadoop:hadoop-client:2.4.0" :+ "org.apache.hadoop:hadoop-client:2.4.0"
"mysql:mysql-connector-java:5.1.12"
val classpath = quietly { val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates( SparkSubmitUtils.resolveMavenCoordinates(
...@@ -106,7 +105,9 @@ private[hive] class IsolatedClientLoader( ...@@ -106,7 +105,9 @@ private[hive] class IsolatedClientLoader(
val config: Map[String, String] = Map.empty, val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true, val isolationOn: Boolean = true,
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent, val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader) val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
val sharedPrefixes: Seq[String] = Seq.empty,
val barrierPrefixes: Seq[String] = Seq.empty)
extends Logging { extends Logging {
// Check to make sure that the root classloader does not know about Hive. // Check to make sure that the root classloader does not know about Hive.
...@@ -122,13 +123,14 @@ private[hive] class IsolatedClientLoader( ...@@ -122,13 +123,14 @@ private[hive] class IsolatedClientLoader(
name.startsWith("scala.") || name.startsWith("scala.") ||
name.startsWith("com.google") || name.startsWith("com.google") ||
name.startsWith("java.lang.") || name.startsWith("java.lang.") ||
name.startsWith("java.net") name.startsWith("java.net") ||
sharedPrefixes.exists(name.startsWith)
/** True if `name` refers to a spark class that must see specific version of Hive. */ /** True if `name` refers to a spark class that must see specific version of Hive. */
protected def isBarrierClass(name: String): Boolean = protected def isBarrierClass(name: String): Boolean =
name.startsWith("org.apache.spark.sql.hive.execution.PairSerDe") ||
name.startsWith(classOf[ClientWrapper].getName) || name.startsWith(classOf[ClientWrapper].getName) ||
name.startsWith(classOf[ReflectionMagic].getName) name.startsWith(classOf[ReflectionMagic].getName) ||
barrierPrefixes.exists(name.startsWith)
protected def classToPath(name: String): String = protected def classToPath(name: String): String =
name.replaceAll("\\.", "/") + ".class" name.replaceAll("\\.", "/") + ".class"
......
...@@ -48,7 +48,14 @@ import scala.collection.JavaConversions._ ...@@ -48,7 +48,14 @@ import scala.collection.JavaConversions._
// SPARK-3729: Test key required to check for initialization errors with config. // SPARK-3729: Test key required to check for initialization errors with config.
object TestHive object TestHive
extends TestHiveContext( extends TestHiveContext(
new SparkContext("local[2]", "TestSQLContext", new SparkConf().set("spark.sql.test", ""))) new SparkContext(
"local[2]",
"TestSQLContext",
new SparkConf()
.set("spark.sql.test", "")
.set(
"spark.sql.hive.metastore.barrierPrefixes",
"org.apache.spark.sql.hive.execution.PairSerDe")))
/** /**
* A locally running test instance of Spark's Hive execution engine. * A locally running test instance of Spark's Hive execution engine.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment