diff --git a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java index e8d999dd00135b6a9c9943dda414d49c2bef3304..462ca3f6f6d19d7e24ddf444bae238e20732f3ae 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java @@ -22,7 +22,7 @@ import java.io.Serializable; /** * Base interface for a function used in Dataset's filter function. * - * If the function returns true, the element is discarded in the returned Dataset. + * If the function returns true, the element is included in the returned Dataset. */ public interface FilterFunction<T> extends Serializable { boolean call(T value) throws Exception; diff --git a/core/src/main/java/org/apache/spark/api/java/function/package.scala b/core/src/main/java/org/apache/spark/api/java/function/package.scala index 0f9bac716416264aeba175b90c0b32570bc6dd81..e19f12fdac09081a56bc8896f8ff2fac3b73d20a 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/package.scala +++ b/core/src/main/java/org/apache/spark/api/java/function/package.scala @@ -22,4 +22,4 @@ package org.apache.spark.api.java * these interfaces to pass functions to various Java API methods for Spark. Please visit Spark's * Java programming guide for more details. */ -package object function +package object function diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index a99bc3bff6eeaf6ca63835bb03802bddff67d1ba..6ddb1a7a1f1a52e0170417448d9e0c5eed244478 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -24,6 +24,8 @@ import org.apache.spark.sql.types.StructType /** * Catalog interface for Spark. To access this, use `SparkSession.catalog`. + * + * @since 2.0.0 */ abstract class Catalog { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala index 0f7feb8eee7bef39048f635f30b3c33681037b04..33032f07f7bea6fa7681dafa1b08ea1dbd0b2906 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala @@ -25,6 +25,14 @@ import org.apache.spark.sql.catalyst.DefinedByConstructorParams // Note: all classes here are expected to be wrapped in Datasets and so must extend // DefinedByConstructorParams for the catalog to be able to create encoders for them. +/** + * A database in Spark, as returned by the `listDatabases` method defined in [[Catalog]]. + * + * @param name name of the database. + * @param description description of the database. + * @param locationUri path (in the form of a uri) to data files. + * @since 2.0.0 + */ class Database( val name: String, @Nullable val description: String, @@ -41,6 +49,16 @@ class Database( } +/** + * A table in Spark, as returned by the `listTables` method in [[Catalog]]. + * + * @param name name of the table. + * @param database name of the database the table belongs to. + * @param description description of the table. + * @param tableType type of the table (e.g. view, table). + * @param isTemporary whether the table is a temporary table. + * @since 2.0.0 + */ class Table( val name: String, @Nullable val database: String, @@ -61,6 +79,17 @@ class Table( } +/** + * A column in Spark, as returned by `listColumns` method in [[Catalog]]. + * + * @param name name of the column. + * @param description description of the column. + * @param dataType data type of the column. + * @param nullable whether the column is nullable. + * @param isPartition whether the column is a partition column. + * @param isBucket whether the column is a bucket column. + * @since 2.0.0 + */ class Column( val name: String, @Nullable val description: String, @@ -83,9 +112,19 @@ class Column( } -// TODO(andrew): should we include the database here? +/** + * A user-defined function in Spark, as returned by `listFunctions` method in [[Catalog]]. + * + * @param name name of the function. + * @param database name of the database the function belongs to. + * @param description description of the function; description can be null. + * @param className the fully qualified class name of the function. + * @param isTemporary whether the function is a temporary function or not. + * @since 2.0.0 + */ class Function( val name: String, + @Nullable val database: String, @Nullable val description: String, val className: String, val isTemporary: Boolean) @@ -94,6 +133,7 @@ class Function( override def toString: String = { "Function[" + s"name='$name', " + + Option(database).map { d => s"database='$d', " }.getOrElse("") + Option(description).map { d => s"description='$d', " }.getOrElse("") + s"className='$className', " + s"isTemporary='$isTemporary']" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index ceb68622752ea0100354317283040d7dd254243c..70e17b10ac3cb06de2c23103638cf3a383161898 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -125,6 +125,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { val metadata = sessionCatalog.lookupFunctionInfo(funcIdent) new Function( name = funcIdent.identifier, + database = funcIdent.database.orNull, description = null, // for now, this is always undefined className = metadata.getClassName, isTemporary = funcIdent.database.isEmpty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index cd434f7887db6ceb66ca67fcf5ccc0d306f84b3a..aec0312c4003e3541e8cc1e27976cc32af09e935 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.internal import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalog.{Column, Database, Function, Table} import org.apache.spark.sql.catalyst.{FunctionIdentifier, ScalaReflection, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ @@ -207,6 +207,14 @@ class CatalogSuite assert(!funcNames2.contains("my_func1")) assert(funcNames2.contains("my_func2")) assert(funcNames2.contains("my_temp_func")) + + // Make sure database is set properly. + assert( + spark.catalog.listFunctions("my_db1").collect().map(_.database).toSet == Set("my_db1", null)) + assert( + spark.catalog.listFunctions("my_db2").collect().map(_.database).toSet == Set("my_db2", null)) + + // Remove the function and make sure they no longer appear. dropFunction("my_func1", Some("my_db1")) dropTempFunction("my_temp_func") val funcNames1b = spark.catalog.listFunctions("my_db1").collect().map(_.name).toSet @@ -248,9 +256,11 @@ class CatalogSuite } test("Function.toString") { - assert(new Function("nama", "commenta", "classNameAh", isTemporary = true).toString == - "Function[name='nama', description='commenta', className='classNameAh', isTemporary='true']") - assert(new Function("nama", null, "classNameAh", isTemporary = false).toString == + assert( + new Function("nama", "databasa", "commenta", "classNameAh", isTemporary = true).toString == + "Function[name='nama', database='databasa', description='commenta', " + + "className='classNameAh', isTemporary='true']") + assert(new Function("nama", null, null, "classNameAh", isTemporary = false).toString == "Function[name='nama', className='classNameAh', isTemporary='false']") } @@ -268,7 +278,7 @@ class CatalogSuite test("catalog classes format in Dataset.show") { val db = new Database("nama", "descripta", "locata") val table = new Table("nama", "databasa", "descripta", "typa", isTemporary = false) - val function = new Function("nama", "descripta", "classa", isTemporary = false) + val function = new Function("nama", "databasa", "descripta", "classa", isTemporary = false) val column = new Column( "nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true) val dbFields = ScalaReflection.getConstructorParameterValues(db) @@ -277,7 +287,7 @@ class CatalogSuite val columnFields = ScalaReflection.getConstructorParameterValues(column) assert(dbFields == Seq("nama", "descripta", "locata")) assert(tableFields == Seq("nama", "databasa", "descripta", "typa", false)) - assert(functionFields == Seq("nama", "descripta", "classa", false)) + assert(functionFields == Seq("nama", "databasa", "descripta", "classa", false)) assert(columnFields == Seq("nama", "descripta", "typa", false, true, true)) val dbString = CatalogImpl.makeDataset(Seq(db), spark).showString(10) val tableString = CatalogImpl.makeDataset(Seq(table), spark).showString(10)