diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 7505e2c236453b4648b46537dd3ba0081f9c9315..f53311c5c9f63ceceb3e178793317308e25f6088 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -687,12 +687,8 @@ class SessionCatalog( * Loads resources such as JARs and Files for a function. Every resource is represented * by a tuple (resource type, resource uri). */ - def loadFunctionResources(resources: Seq[(String, String)]): Unit = { - resources.foreach { case (resourceType, uri) => - val functionResource = - FunctionResource(FunctionResourceType.fromString(resourceType.toLowerCase), uri) - functionResourceLoader.loadResource(functionResource) - } + def loadFunctionResources(resources: Seq[FunctionResource]): Unit = { + resources.foreach(functionResourceLoader.loadResource) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala index 5adcc892cf682f5bf2c0c12a29edbecd730652b7..7da1fe93c6c7c8b6bb371a4222a16aff6e39a3a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala @@ -20,16 +20,16 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.spark.sql.AnalysisException /** An trait that represents the type of a resourced needed by a function. */ -sealed trait FunctionResourceType +abstract class FunctionResourceType(val resourceType: String) -object JarResource extends FunctionResourceType +object JarResource extends FunctionResourceType("jar") -object FileResource extends FunctionResourceType +object FileResource extends FunctionResourceType("file") // We do not allow users to specify a archive because it is YARN specific. // When loading resources, we will throw an exception and ask users to // use --archive with spark submit. -object ArchiveResource extends FunctionResourceType +object ArchiveResource extends FunctionResourceType("archive") object FunctionResourceType { def fromString(resourceType: String): FunctionResourceType = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2c6e9f53b27b173b19ced177767ea4dd130748fd..fc2068cac5ab2a3521b2db3efffa15d3daf01954 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -33,11 +33,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc" * @param resources resource types and Uris used by the function */ -// TODO: Use FunctionResource instead of (String, String) as the element type of resources. case class CatalogFunction( identifier: FunctionIdentifier, className: String, - resources: Seq[(String, String)]) + resources: Seq[FunctionResource]) /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 651be264850d009872b7602756707e87d857c8ee..ae190c0da6327090e073b889716184243124a0b8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -466,7 +466,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() assert(catalog.getFunction("db2", "func1") == CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass, - Seq.empty[(String, String)])) + Seq.empty[FunctionResource])) intercept[AnalysisException] { catalog.getFunction("db2", "does_not_exist") } @@ -679,7 +679,7 @@ abstract class CatalogTestUtils { } def newFunc(name: String, database: Option[String] = None): CatalogFunction = { - CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)]) + CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[FunctionResource]) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index f2d2e99a3cad52ccb99cd30355614450d41cf41f..80422c20a6df56f4f718167fd2497fad36500d52 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -798,7 +798,7 @@ class SessionCatalogSuite extends SparkFunSuite { val catalog = new SessionCatalog(newBasicCatalog()) val expected = CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass, - Seq.empty[(String, String)]) + Seq.empty[FunctionResource]) assert(catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) == expected) // Get function without explicitly specifying database catalog.setCurrentDatabase("db2") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 086282d07c8dd38834cd7f724c660bea5bf917a7..87e6f9094daaf074c1252ab35682d2bae3d124de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -25,7 +25,7 @@ import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema} @@ -430,7 +430,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val resourceType = resource.identifier.getText.toLowerCase resourceType match { case "jar" | "file" | "archive" => - resourceType -> string(resource.STRING) + FunctionResource(FunctionResourceType.fromString(resourceType), string(resource.STRING)) case other => throw operationNotAllowed(s"CREATE FUNCTION with resource type '$resourceType'", ctx) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index a9aa8d797a3f910f3a3e5051d3b28f50dd0a0422..1ea9bc52999681a3c76602393291bbc990583897 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException} -import org.apache.spark.sql.catalyst.catalog.CatalogFunction +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource} import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo} import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -39,12 +39,11 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} * AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']] * }}} */ -// TODO: Use Seq[FunctionResource] instead of Seq[(String, String)] for resources. case class CreateFunction( databaseName: Option[String], functionName: String, className: String, - resources: Seq[(String, String)], + resources: Seq[FunctionResource], isTemp: Boolean) extends RunnableCommand { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index bd428a06f5099c65874e8e9c3f596f0982a506ae..a728ac3c8a42b9c7eff7f2061ce69a5447e944fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{FunctionResource, FunctionResourceType} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project @@ -156,13 +157,17 @@ class DDLCommandSuite extends PlanTest { None, "helloworld", "com.matthewrathbone.example.SimpleUDFExample", - Seq(("jar", "/path/to/jar1"), ("jar", "/path/to/jar2")), + Seq( + FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"), + FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")), isTemp = true) val expected2 = CreateFunction( Some("hello"), "world", "com.matthewrathbone.example.SimpleUDFExample", - Seq(("archive", "/path/to/archive"), ("file", "/path/to/file")), + Seq( + FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"), + FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")), isTemp = false) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index cddc0b6e34a44d0f584cc14f62824e7941ce7f8a..bb324592028b8f138b7695adf00046a9b5d9eae6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -677,8 +677,9 @@ private[hive] class HiveClientImpl( .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = { - val resourceUris = f.resources.map { case (resourceType, resourcePath) => - new ResourceUri(ResourceType.valueOf(resourceType.toUpperCase), resourcePath) + val resourceUris = f.resources.map { resource => + new ResourceUri( + ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri) } new HiveFunction( f.identifier.funcName, @@ -700,7 +701,7 @@ private[hive] class HiveClientImpl( case ResourceType.JAR => "jar" case r => throw new AnalysisException(s"Unknown resource type: $r") } - (resourceType, uri.getUri()) + FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri()) } new CatalogFunction(name, hf.getClassName, resources) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index bfe559f0b229f5a1804a6c93db3c62090d350ee7..d05a3623ae01fa782df3afe7ab6f3130a4915100 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.CatalogFunction +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, JarResource} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer @@ -425,7 +425,7 @@ object PermanentHiveUDFTest2 extends Logging { val function = CatalogFunction( FunctionIdentifier("example_max"), "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax", - ("JAR" -> jar) :: Nil) + FunctionResource(JarResource, jar) :: Nil) hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false) val source = hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")