From da02d006bbb5c4fe62abd5542b9fff7d1c58603c Mon Sep 17 00:00:00 2001
From: Sandeep Singh <sandeep@techaddict.me>
Date: Tue, 10 May 2016 14:21:47 -0700
Subject: [PATCH] [SPARK-15249][SQL] Use FunctionResource instead of (String,
 String) in CreateFunction and CatalogFunction for resource

Use FunctionResource instead of (String, String) in CreateFunction and CatalogFunction for resource
see: TODO's here
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L36
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala#L42

Existing tests

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #13024 from techaddict/SPARK-15249.
---
 .../spark/sql/catalyst/catalog/SessionCatalog.scala      | 8 ++------
 .../spark/sql/catalyst/catalog/functionResources.scala   | 8 ++++----
 .../apache/spark/sql/catalyst/catalog/interface.scala    | 3 +--
 .../sql/catalyst/catalog/ExternalCatalogSuite.scala      | 4 ++--
 .../spark/sql/catalyst/catalog/SessionCatalogSuite.scala | 2 +-
 .../org/apache/spark/sql/execution/SparkSqlParser.scala  | 4 ++--
 .../apache/spark/sql/execution/command/functions.scala   | 5 ++---
 .../spark/sql/execution/command/DDLCommandSuite.scala    | 9 +++++++--
 .../apache/spark/sql/hive/client/HiveClientImpl.scala    | 7 ++++---
 .../org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala | 4 ++--
 10 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 7505e2c236..f53311c5c9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -687,12 +687,8 @@ class SessionCatalog(
    * Loads resources such as JARs and Files for a function. Every resource is represented
    * by a tuple (resource type, resource uri).
    */
-  def loadFunctionResources(resources: Seq[(String, String)]): Unit = {
-    resources.foreach { case (resourceType, uri) =>
-      val functionResource =
-        FunctionResource(FunctionResourceType.fromString(resourceType.toLowerCase), uri)
-      functionResourceLoader.loadResource(functionResource)
-    }
+  def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
+    resources.foreach(functionResourceLoader.loadResource)
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
index 5adcc892cf..7da1fe93c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
@@ -20,16 +20,16 @@ package org.apache.spark.sql.catalyst.catalog
 import org.apache.spark.sql.AnalysisException
 
 /** An trait that represents the type of a resourced needed by a function. */
-sealed trait FunctionResourceType
+abstract class FunctionResourceType(val resourceType: String)
 
-object JarResource extends FunctionResourceType
+object JarResource extends FunctionResourceType("jar")
 
-object FileResource extends FunctionResourceType
+object FileResource extends FunctionResourceType("file")
 
 // We do not allow users to specify a archive because it is YARN specific.
 // When loading resources, we will throw an exception and ask users to
 // use --archive with spark submit.
-object ArchiveResource extends FunctionResourceType
+object ArchiveResource extends FunctionResourceType("archive")
 
 object FunctionResourceType {
   def fromString(resourceType: String): FunctionResourceType = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2c6e9f53b2..fc2068cac5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -33,11 +33,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
  * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
  * @param resources resource types and Uris used by the function
  */
-// TODO: Use FunctionResource instead of (String, String) as the element type of resources.
 case class CatalogFunction(
     identifier: FunctionIdentifier,
     className: String,
-    resources: Seq[(String, String)])
+    resources: Seq[FunctionResource])
 
 
 /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 651be26485..ae190c0da6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -466,7 +466,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     val catalog = newBasicCatalog()
     assert(catalog.getFunction("db2", "func1") ==
       CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
-        Seq.empty[(String, String)]))
+        Seq.empty[FunctionResource]))
     intercept[AnalysisException] {
       catalog.getFunction("db2", "does_not_exist")
     }
@@ -679,7 +679,7 @@ abstract class CatalogTestUtils {
   }
 
   def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
-    CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)])
+    CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[FunctionResource])
   }
 
   /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index f2d2e99a3c..80422c20a6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -798,7 +798,7 @@ class SessionCatalogSuite extends SparkFunSuite {
     val catalog = new SessionCatalog(newBasicCatalog())
     val expected =
       CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
-      Seq.empty[(String, String)])
+      Seq.empty[FunctionResource])
     assert(catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) == expected)
     // Get function without explicitly specifying database
     catalog.setCurrentDatabase("db2")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 086282d07c..87e6f9094d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -25,7 +25,7 @@ import org.antlr.v4.runtime.tree.TerminalNode
 
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
@@ -430,7 +430,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       val resourceType = resource.identifier.getText.toLowerCase
       resourceType match {
         case "jar" | "file" | "archive" =>
-          resourceType -> string(resource.STRING)
+          FunctionResource(FunctionResourceType.fromString(resourceType), string(resource.STRING))
         case other =>
           throw operationNotAllowed(s"CREATE FUNCTION with resource type '$resourceType'", ctx)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index a9aa8d797a..1ea9bc5299 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException}
-import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
@@ -39,12 +39,11 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
  *    AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
  * }}}
  */
-// TODO: Use Seq[FunctionResource] instead of Seq[(String, String)] for resources.
 case class CreateFunction(
     databaseName: Option[String],
     functionName: String,
     className: String,
-    resources: Seq[(String, String)],
+    resources: Seq[FunctionResource],
     isTemp: Boolean)
   extends RunnableCommand {
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index bd428a06f5..a728ac3c8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{FunctionResource, FunctionResourceType}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -156,13 +157,17 @@ class DDLCommandSuite extends PlanTest {
       None,
       "helloworld",
       "com.matthewrathbone.example.SimpleUDFExample",
-      Seq(("jar", "/path/to/jar1"), ("jar", "/path/to/jar2")),
+      Seq(
+        FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
+        FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
       isTemp = true)
     val expected2 = CreateFunction(
       Some("hello"),
       "world",
       "com.matthewrathbone.example.SimpleUDFExample",
-      Seq(("archive", "/path/to/archive"), ("file", "/path/to/file")),
+      Seq(
+        FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
+        FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
       isTemp = false)
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index cddc0b6e34..bb32459202 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -677,8 +677,9 @@ private[hive] class HiveClientImpl(
       .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
 
   private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
-    val resourceUris = f.resources.map { case (resourceType, resourcePath) =>
-      new ResourceUri(ResourceType.valueOf(resourceType.toUpperCase), resourcePath)
+    val resourceUris = f.resources.map { resource =>
+      new ResourceUri(
+        ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri)
     }
     new HiveFunction(
       f.identifier.funcName,
@@ -700,7 +701,7 @@ private[hive] class HiveClientImpl(
         case ResourceType.JAR => "jar"
         case r => throw new AnalysisException(s"Unknown resource type: $r")
       }
-      (resourceType, uri.getUri())
+      FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
     }
     new CatalogFunction(name, hf.getClassName, resources)
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index bfe559f0b2..d05a3623ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, JarResource}
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
 import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
@@ -425,7 +425,7 @@ object PermanentHiveUDFTest2 extends Logging {
     val function = CatalogFunction(
       FunctionIdentifier("example_max"),
       "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax",
-      ("JAR" -> jar) :: Nil)
+      FunctionResource(JarResource, jar) :: Nil)
     hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false)
     val source =
       hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
-- 
GitLab