From 1388fdd70733b92488f32bb31d2eb2ea4155ee62 Mon Sep 17 00:00:00 2001
From: Reza Safi <rezasafi@cloudera.com>
Date: Mon, 5 Jun 2017 13:19:01 -0700
Subject: [PATCH] [SPARK-20926][SQL] Removing exposures to guava library caused
 by directly accessing  SessionCatalog's tableRelationCache

There could be test failures because DataStorageStrategy, HiveMetastoreCatalog and also HiveSchemaInferenceSuite were exposed to guava library by directly accessing SessionCatalog's tableRelationCacheg. These failures occur when guava shading is in place.

## What changes were proposed in this pull request?
This change removes those guava exposures by introducing new methods in SessionCatalog and also changing DataStorageStrategy, HiveMetastoreCatalog and HiveSchemaInferenceSuite so that they use those proxy methods.

## How was this patch tested?

Unit tests passed after applying these changes.

Author: Reza Safi <rezasafi@cloudera.com>

Closes #18148 from rezasafi/branch-2.2.
---
 .../sql/catalyst/catalog/SessionCatalog.scala | 31 ++++++++++++++++---
 .../datasources/DataSourceStrategy.scala      |  4 +--
 .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 +++++-----
 .../sql/hive/HiveSchemaInferenceSuite.scala   |  2 +-
 4 files changed, 38 insertions(+), 15 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index a78440df4f..57006bfaf9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.catalog
 
 import java.net.URI
 import java.util.Locale
+import java.util.concurrent.Callable
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
@@ -125,14 +126,36 @@ class SessionCatalog(
     if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
   }
 
-  /**
-   * A cache of qualified table names to table relation plans.
-   */
-  val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
+  private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
     val cacheSize = conf.tableRelationCacheSize
     CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
   }
 
+  /** This method provides a way to get a cached plan. */
+  def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
+    tableRelationCache.get(t, c)
+  }
+
+  /** This method provides a way to get a cached plan if the key exists. */
+  def getCachedTable(key: QualifiedTableName): LogicalPlan = {
+    tableRelationCache.getIfPresent(key)
+  }
+
+  /** This method provides a way to cache a plan. */
+  def cacheTable(t: QualifiedTableName, l: LogicalPlan): Unit = {
+    tableRelationCache.put(t, l)
+  }
+
+  /** This method provides a way to invalidate a cached plan. */
+  def invalidateCachedTable(key: QualifiedTableName): Unit = {
+    tableRelationCache.invalidate(key)
+  }
+
+  /** This method provides a way to invalidate all the cached plans. */
+  def invalidateAllCachedTables(): Unit = {
+    tableRelationCache.invalidateAll()
+  }
+
   /**
    * This method is used to make the given path qualified before we
    * store this path in the underlying external catalog. So, when a path
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 21d75a4049..e05a8d5f02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -215,9 +215,9 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
   private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
     val table = r.tableMeta
     val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
-    val cache = sparkSession.sessionState.catalog.tableRelationCache
+    val catalogProxy = sparkSession.sessionState.catalog
 
-    val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
+    val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
       override def call(): LogicalPlan = {
         val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
         val dataSource =
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6b98066cb7..9b3cbb63a2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.types._
 private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
   // these are def_s and not val/lazy val since the latter would introduce circular references
   private def sessionState = sparkSession.sessionState
-  private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
+  private def catalogProxy = sparkSession.sessionState.catalog
   import HiveMetastoreCatalog._
 
   /** These locks guard against multiple attempts to instantiate a table, which wastes memory. */
@@ -61,7 +61,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     val key = QualifiedTableName(
       table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
       table.table.toLowerCase)
-    tableRelationCache.getIfPresent(key)
+    catalogProxy.getCachedTable(key)
   }
 
   private def getCached(
@@ -71,7 +71,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       expectedFileFormat: Class[_ <: FileFormat],
       partitionSchema: Option[StructType]): Option[LogicalRelation] = {
 
-    tableRelationCache.getIfPresent(tableIdentifier) match {
+    catalogProxy.getCachedTable(tableIdentifier) match {
       case null => None // Cache miss
       case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
         val cachedRelationFileFormatClass = relation.fileFormat.getClass
@@ -92,21 +92,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               Some(logical)
             } else {
               // If the cached relation is not updated, we invalidate it right away.
-              tableRelationCache.invalidate(tableIdentifier)
+              catalogProxy.invalidateCachedTable(tableIdentifier)
               None
             }
           case _ =>
             logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
               s"However, we are getting a ${relation.fileFormat} from the metastore cache. " +
               "This cached entry will be invalidated.")
-            tableRelationCache.invalidate(tableIdentifier)
+            catalogProxy.invalidateCachedTable(tableIdentifier)
             None
         }
       case other =>
         logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
           s"However, we are getting a $other from the metastore cache. " +
           "This cached entry will be invalidated.")
-        tableRelationCache.invalidate(tableIdentifier)
+        catalogProxy.invalidateCachedTable(tableIdentifier)
         None
     }
   }
@@ -176,7 +176,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             fileFormat = fileFormat,
             options = options)(sparkSession = sparkSession)
           val created = LogicalRelation(fsRelation, updatedTable)
-          tableRelationCache.put(tableIdentifier, created)
+          catalogProxy.cacheTable(tableIdentifier, created)
           created
         }
 
@@ -205,7 +205,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
                 className = fileType).resolveRelation(),
               table = updatedTable)
 
-          tableRelationCache.put(tableIdentifier, created)
+          catalogProxy.cacheTable(tableIdentifier, created)
           created
         }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
index b3a06045b5..d271acc63d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
@@ -46,7 +46,7 @@ class HiveSchemaInferenceSuite
 
   override def afterEach(): Unit = {
     super.afterEach()
-    spark.sessionState.catalog.tableRelationCache.invalidateAll()
+    spark.sessionState.catalog.invalidateAllCachedTables()
     FileStatusCache.resetForTesting()
   }
 
-- 
GitLab