Skip to content
Snippets Groups Projects
Commit 223f1d58 authored by Reynold Xin's avatar Reynold Xin Committed by Andrew Or
Browse files

[SPARK-15662][SQL] Add since annotation for classes in sql.catalog

## What changes were proposed in this pull request?
This patch does a few things:

1. Adds since version annotation to methods and classes in sql.catalog.
2. Fixed a typo in FilterFunction and a whitespace issue in spark/api/java/function/package.scala
3. Added "database" field to Function class.

## How was this patch tested?
Updated unit test case for "database" field in Function class.

Author: Reynold Xin <rxin@databricks.com>

Closes #13406 from rxin/SPARK-15662.
parent 69547042
No related branches found
No related tags found
No related merge requests found
...@@ -22,7 +22,7 @@ import java.io.Serializable; ...@@ -22,7 +22,7 @@ import java.io.Serializable;
/** /**
* Base interface for a function used in Dataset's filter function. * Base interface for a function used in Dataset's filter function.
* *
* If the function returns true, the element is discarded in the returned Dataset. * If the function returns true, the element is included in the returned Dataset.
*/ */
public interface FilterFunction<T> extends Serializable { public interface FilterFunction<T> extends Serializable {
boolean call(T value) throws Exception; boolean call(T value) throws Exception;
......
...@@ -22,4 +22,4 @@ package org.apache.spark.api.java ...@@ -22,4 +22,4 @@ package org.apache.spark.api.java
* these interfaces to pass functions to various Java API methods for Spark. Please visit Spark's * these interfaces to pass functions to various Java API methods for Spark. Please visit Spark's
* Java programming guide for more details. * Java programming guide for more details.
*/ */
package object function package object function
...@@ -24,6 +24,8 @@ import org.apache.spark.sql.types.StructType ...@@ -24,6 +24,8 @@ import org.apache.spark.sql.types.StructType
/** /**
* Catalog interface for Spark. To access this, use `SparkSession.catalog`. * Catalog interface for Spark. To access this, use `SparkSession.catalog`.
*
* @since 2.0.0
*/ */
abstract class Catalog { abstract class Catalog {
......
...@@ -25,6 +25,14 @@ import org.apache.spark.sql.catalyst.DefinedByConstructorParams ...@@ -25,6 +25,14 @@ import org.apache.spark.sql.catalyst.DefinedByConstructorParams
// Note: all classes here are expected to be wrapped in Datasets and so must extend // Note: all classes here are expected to be wrapped in Datasets and so must extend
// DefinedByConstructorParams for the catalog to be able to create encoders for them. // DefinedByConstructorParams for the catalog to be able to create encoders for them.
/**
* A database in Spark, as returned by the `listDatabases` method defined in [[Catalog]].
*
* @param name name of the database.
* @param description description of the database.
* @param locationUri path (in the form of a uri) to data files.
* @since 2.0.0
*/
class Database( class Database(
val name: String, val name: String,
@Nullable val description: String, @Nullable val description: String,
...@@ -41,6 +49,16 @@ class Database( ...@@ -41,6 +49,16 @@ class Database(
} }
/**
* A table in Spark, as returned by the `listTables` method in [[Catalog]].
*
* @param name name of the table.
* @param database name of the database the table belongs to.
* @param description description of the table.
* @param tableType type of the table (e.g. view, table).
* @param isTemporary whether the table is a temporary table.
* @since 2.0.0
*/
class Table( class Table(
val name: String, val name: String,
@Nullable val database: String, @Nullable val database: String,
...@@ -61,6 +79,17 @@ class Table( ...@@ -61,6 +79,17 @@ class Table(
} }
/**
* A column in Spark, as returned by `listColumns` method in [[Catalog]].
*
* @param name name of the column.
* @param description description of the column.
* @param dataType data type of the column.
* @param nullable whether the column is nullable.
* @param isPartition whether the column is a partition column.
* @param isBucket whether the column is a bucket column.
* @since 2.0.0
*/
class Column( class Column(
val name: String, val name: String,
@Nullable val description: String, @Nullable val description: String,
...@@ -83,9 +112,19 @@ class Column( ...@@ -83,9 +112,19 @@ class Column(
} }
// TODO(andrew): should we include the database here? /**
* A user-defined function in Spark, as returned by `listFunctions` method in [[Catalog]].
*
* @param name name of the function.
* @param database name of the database the function belongs to.
* @param description description of the function; description can be null.
* @param className the fully qualified class name of the function.
* @param isTemporary whether the function is a temporary function or not.
* @since 2.0.0
*/
class Function( class Function(
val name: String, val name: String,
@Nullable val database: String,
@Nullable val description: String, @Nullable val description: String,
val className: String, val className: String,
val isTemporary: Boolean) val isTemporary: Boolean)
...@@ -94,6 +133,7 @@ class Function( ...@@ -94,6 +133,7 @@ class Function(
override def toString: String = { override def toString: String = {
"Function[" + "Function[" +
s"name='$name', " + s"name='$name', " +
Option(database).map { d => s"database='$d', " }.getOrElse("") +
Option(description).map { d => s"description='$d', " }.getOrElse("") + Option(description).map { d => s"description='$d', " }.getOrElse("") +
s"className='$className', " + s"className='$className', " +
s"isTemporary='$isTemporary']" s"isTemporary='$isTemporary']"
......
...@@ -125,6 +125,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { ...@@ -125,6 +125,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
val metadata = sessionCatalog.lookupFunctionInfo(funcIdent) val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
new Function( new Function(
name = funcIdent.identifier, name = funcIdent.identifier,
database = funcIdent.database.orNull,
description = null, // for now, this is always undefined description = null, // for now, this is always undefined
className = metadata.getClassName, className = metadata.getClassName,
isTemporary = funcIdent.database.isEmpty) isTemporary = funcIdent.database.isEmpty)
......
...@@ -20,7 +20,7 @@ package org.apache.spark.sql.internal ...@@ -20,7 +20,7 @@ package org.apache.spark.sql.internal
import org.scalatest.BeforeAndAfterEach import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.{Column, Database, Function, Table} import org.apache.spark.sql.catalog.{Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, ScalaReflection, TableIdentifier} import org.apache.spark.sql.catalyst.{FunctionIdentifier, ScalaReflection, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog._
...@@ -207,6 +207,14 @@ class CatalogSuite ...@@ -207,6 +207,14 @@ class CatalogSuite
assert(!funcNames2.contains("my_func1")) assert(!funcNames2.contains("my_func1"))
assert(funcNames2.contains("my_func2")) assert(funcNames2.contains("my_func2"))
assert(funcNames2.contains("my_temp_func")) assert(funcNames2.contains("my_temp_func"))
// Make sure database is set properly.
assert(
spark.catalog.listFunctions("my_db1").collect().map(_.database).toSet == Set("my_db1", null))
assert(
spark.catalog.listFunctions("my_db2").collect().map(_.database).toSet == Set("my_db2", null))
// Remove the function and make sure they no longer appear.
dropFunction("my_func1", Some("my_db1")) dropFunction("my_func1", Some("my_db1"))
dropTempFunction("my_temp_func") dropTempFunction("my_temp_func")
val funcNames1b = spark.catalog.listFunctions("my_db1").collect().map(_.name).toSet val funcNames1b = spark.catalog.listFunctions("my_db1").collect().map(_.name).toSet
...@@ -248,9 +256,11 @@ class CatalogSuite ...@@ -248,9 +256,11 @@ class CatalogSuite
} }
test("Function.toString") { test("Function.toString") {
assert(new Function("nama", "commenta", "classNameAh", isTemporary = true).toString == assert(
"Function[name='nama', description='commenta', className='classNameAh', isTemporary='true']") new Function("nama", "databasa", "commenta", "classNameAh", isTemporary = true).toString ==
assert(new Function("nama", null, "classNameAh", isTemporary = false).toString == "Function[name='nama', database='databasa', description='commenta', " +
"className='classNameAh', isTemporary='true']")
assert(new Function("nama", null, null, "classNameAh", isTemporary = false).toString ==
"Function[name='nama', className='classNameAh', isTemporary='false']") "Function[name='nama', className='classNameAh', isTemporary='false']")
} }
...@@ -268,7 +278,7 @@ class CatalogSuite ...@@ -268,7 +278,7 @@ class CatalogSuite
test("catalog classes format in Dataset.show") { test("catalog classes format in Dataset.show") {
val db = new Database("nama", "descripta", "locata") val db = new Database("nama", "descripta", "locata")
val table = new Table("nama", "databasa", "descripta", "typa", isTemporary = false) val table = new Table("nama", "databasa", "descripta", "typa", isTemporary = false)
val function = new Function("nama", "descripta", "classa", isTemporary = false) val function = new Function("nama", "databasa", "descripta", "classa", isTemporary = false)
val column = new Column( val column = new Column(
"nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true) "nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true)
val dbFields = ScalaReflection.getConstructorParameterValues(db) val dbFields = ScalaReflection.getConstructorParameterValues(db)
...@@ -277,7 +287,7 @@ class CatalogSuite ...@@ -277,7 +287,7 @@ class CatalogSuite
val columnFields = ScalaReflection.getConstructorParameterValues(column) val columnFields = ScalaReflection.getConstructorParameterValues(column)
assert(dbFields == Seq("nama", "descripta", "locata")) assert(dbFields == Seq("nama", "descripta", "locata"))
assert(tableFields == Seq("nama", "databasa", "descripta", "typa", false)) assert(tableFields == Seq("nama", "databasa", "descripta", "typa", false))
assert(functionFields == Seq("nama", "descripta", "classa", false)) assert(functionFields == Seq("nama", "databasa", "descripta", "classa", false))
assert(columnFields == Seq("nama", "descripta", "typa", false, true, true)) assert(columnFields == Seq("nama", "descripta", "typa", false, true, true))
val dbString = CatalogImpl.makeDataset(Seq(db), spark).showString(10) val dbString = CatalogImpl.makeDataset(Seq(db), spark).showString(10)
val tableString = CatalogImpl.makeDataset(Seq(table), spark).showString(10) val tableString = CatalogImpl.makeDataset(Seq(table), spark).showString(10)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment