From 2ca60ace8f42cf0bd4569d86c86c37a8a2b6a37c Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Sun, 17 May 2015 12:43:15 -0700
Subject: [PATCH] [SPARK-7491] [SQL] Allow configuration of classloader
 isolation for hive

Author: Michael Armbrust <michael@databricks.com>

Closes #6167 from marmbrus/configureIsolation and squashes the following commits:

6147cbe [Michael Armbrust] filter other conf
22cc3bc7 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into configureIsolation
07476ee [Michael Armbrust] filter empty prefixes
dfdf19c [Michael Armbrust] [SPARK-6906][SQL] Allow configuration of classloader isolation for hive
---
 .../apache/spark/sql/hive/HiveContext.scala   | 33 +++++++++++++++++--
 .../hive/client/IsolatedClientLoader.scala    | 14 ++++----
 .../apache/spark/sql/hive/test/TestHive.scala |  9 ++++-
 3 files changed, 46 insertions(+), 10 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 9d98c36e94..2733ebdb95 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -122,6 +122,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   protected[hive] def hiveMetastoreJars: String =
     getConf(HIVE_METASTORE_JARS, "builtin")
 
+  /**
+   * A comma separated list of class prefixes that should be loaded using the classloader that
+   * is shared between Spark SQL and a specific version of Hive. An example of classes that should
+   * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
+   * to be shared are those that interact with classes that are already shared.  For example,
+   * custom appenders that are used by log4j.
+   */
+  protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
+    getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes)
+      .split(",").filterNot(_ == "")
+
+  private def jdbcPrefixes = Seq(
+    "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc").mkString(",")
+
+  /**
+   * A comma separated list of class prefixes that should explicitly be reloaded for each version
+   * of Hive that Spark SQL is communicating with.  For example, Hive UDFs that are declared in a
+   * prefix that typically would be shared (i.e. org.apache.spark.*)
+   */
+  protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
+    getConf("spark.sql.hive.metastore.barrierPrefixes", "")
+      .split(",").filterNot(_ == "")
+
   @transient
   protected[sql] lazy val substitutor = new VariableSubstitution()
 
@@ -179,12 +202,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         version = metaVersion,
         execJars = jars.toSeq,
         config = allConfig,
-        isolationOn = true)
+        isolationOn = true,
+        barrierPrefixes = hiveMetastoreBarrierPrefixes,
+        sharedPrefixes = hiveMetastoreSharedPrefixes)
     } else if (hiveMetastoreJars == "maven") {
       // TODO: Support for loading the jars from an already downloaded location.
       logInfo(
         s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
-      IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig )
+      IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig)
     } else {
       // Convert to files and expand any directories.
       val jars =
@@ -210,7 +235,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         version = metaVersion,
         execJars = jars.toSeq,
         config = allConfig,
-        isolationOn = true)
+        isolationOn = true,
+        barrierPrefixes = hiveMetastoreBarrierPrefixes,
+        sharedPrefixes = hiveMetastoreSharedPrefixes)
     }
     isolatedLoader.client
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 7f94c93ba4..196a3d836c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -56,8 +56,7 @@ private[hive] object IsolatedClientLoader {
         (if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil))
         .map(a => s"org.apache.hive:$a:${version.fullVersion}") :+
         "com.google.guava:guava:14.0.1" :+
-        "org.apache.hadoop:hadoop-client:2.4.0" :+
-        "mysql:mysql-connector-java:5.1.12"
+        "org.apache.hadoop:hadoop-client:2.4.0"
 
     val classpath = quietly {
       SparkSubmitUtils.resolveMavenCoordinates(
@@ -106,7 +105,9 @@ private[hive] class IsolatedClientLoader(
     val config: Map[String, String] = Map.empty,
     val isolationOn: Boolean = true,
     val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
-    val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader)
+    val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
+    val sharedPrefixes: Seq[String] = Seq.empty,
+    val barrierPrefixes: Seq[String] = Seq.empty)
   extends Logging {
 
   // Check to make sure that the root classloader does not know about Hive.
@@ -122,13 +123,14 @@ private[hive] class IsolatedClientLoader(
     name.startsWith("scala.") ||
     name.startsWith("com.google") ||
     name.startsWith("java.lang.") ||
-    name.startsWith("java.net")
+    name.startsWith("java.net") ||
+    sharedPrefixes.exists(name.startsWith)
 
   /** True if `name` refers to a spark class that must see specific version of Hive. */
   protected def isBarrierClass(name: String): Boolean =
-    name.startsWith("org.apache.spark.sql.hive.execution.PairSerDe") ||
     name.startsWith(classOf[ClientWrapper].getName) ||
-    name.startsWith(classOf[ReflectionMagic].getName)
+    name.startsWith(classOf[ReflectionMagic].getName) ||
+    barrierPrefixes.exists(name.startsWith)
 
   protected def classToPath(name: String): String =
     name.replaceAll("\\.", "/") + ".class"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1598d4bd47..9648284074 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -48,7 +48,14 @@ import scala.collection.JavaConversions._
 // SPARK-3729: Test key required to check for initialization errors with config.
 object TestHive
   extends TestHiveContext(
-    new SparkContext("local[2]", "TestSQLContext", new SparkConf().set("spark.sql.test", "")))
+    new SparkContext(
+      "local[2]",
+      "TestSQLContext",
+      new SparkConf()
+        .set("spark.sql.test", "")
+        .set(
+          "spark.sql.hive.metastore.barrierPrefixes",
+          "org.apache.spark.sql.hive.execution.PairSerDe")))
 
 /**
  * A locally running test instance of Spark's Hive execution engine.
-- 
GitLab