Skip to content
Snippets Groups Projects
Commit e6495800 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-13884][SQL] Remove DescribeCommand's dependency on LogicalPlan

## What changes were proposed in this pull request?
This patch removes DescribeCommand's dependency on LogicalPlan. After this patch, DescribeCommand simply accepts a TableIdentifier. It minimizes the dependency, and blocks my next patch (removes SQLContext dependency from SparkPlanner).

## How was this patch tested?
Should be covered by existing unit tests and Hive compatibility tests that run describe table.

Author: Reynold Xin <rxin@databricks.com>

Closes #11710 from rxin/SPARK-13884.
parent f72743d9
No related branches found
No related tags found
No related merge requests found
...@@ -271,15 +271,14 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly ...@@ -271,15 +271,14 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
// issue. // issue.
val tableIdent = TableIdentifier( val tableIdent = TableIdentifier(
cleanIdentifier(tableName), Some(cleanIdentifier(dbName))) cleanIdentifier(tableName), Some(cleanIdentifier(dbName)))
datasources.DescribeCommand( datasources.DescribeCommand(tableIdent, isExtended = extended.isDefined)
UnresolvedRelation(tableIdent, None), isExtended = extended.isDefined)
case Token(dbName, Nil) :: Token(tableName, Nil) :: Token(colName, Nil) :: Nil => case Token(dbName, Nil) :: Token(tableName, Nil) :: Token(colName, Nil) :: Nil =>
// It is describing a column with the format like "describe db.table column". // It is describing a column with the format like "describe db.table column".
nodeToDescribeFallback(node) nodeToDescribeFallback(node)
case tableName :: Nil => case tableName :: Nil =>
// It is describing a table with the format like "describe table". // It is describing a table with the format like "describe table".
datasources.DescribeCommand( datasources.DescribeCommand(
UnresolvedRelation(TableIdentifier(cleanIdentifier(tableName.text)), None), TableIdentifier(cleanIdentifier(tableName.text)),
isExtended = extended.isDefined) isExtended = extended.isDefined)
case _ => case _ =>
nodeToDescribeFallback(node) nodeToDescribeFallback(node)
......
...@@ -398,11 +398,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { ...@@ -398,11 +398,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
case describe @ LogicalDescribeCommand(table, isExtended) => case describe @ LogicalDescribeCommand(table, isExtended) =>
val resultPlan = self.sqlContext.executePlan(table).executedPlan ExecutedCommand(RunnableDescribeCommand(table, describe.output, isExtended)) :: Nil
ExecutedCommand(
RunnableDescribeCommand(resultPlan, describe.output, isExtended)) :: Nil
case logical.ShowFunctions(db, pattern) => ExecutedCommand(ShowFunctions(db, pattern)) :: Nil case logical.ShowFunctions(db, pattern) =>
ExecutedCommand(ShowFunctions(db, pattern)) :: Nil
case logical.DescribeFunction(function, extended) => case logical.DescribeFunction(function, extended) =>
ExecutedCommand(DescribeFunction(function, extended)) :: Nil ExecutedCommand(DescribeFunction(function, extended)) :: Nil
......
...@@ -22,7 +22,7 @@ import java.util.NoSuchElementException ...@@ -22,7 +22,7 @@ import java.util.NoSuchElementException
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SQLContext} import org.apache.spark.sql.{Dataset, Row, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical
...@@ -293,13 +293,14 @@ case object ClearCacheCommand extends RunnableCommand { ...@@ -293,13 +293,14 @@ case object ClearCacheCommand extends RunnableCommand {
case class DescribeCommand( case class DescribeCommand(
child: SparkPlan, table: TableIdentifier,
override val output: Seq[Attribute], override val output: Seq[Attribute],
isExtended: Boolean) isExtended: Boolean)
extends RunnableCommand { extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = { override def run(sqlContext: SQLContext): Seq[Row] = {
child.schema.fields.map { field => val relation = sqlContext.sessionState.catalog.lookupRelation(table)
relation.schema.fields.map { field =>
val cmtKey = "comment" val cmtKey = "comment"
val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else "" val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else ""
Row(field.name, field.dataType.simpleString, comment) Row(field.name, field.dataType.simpleString, comment)
......
...@@ -33,8 +33,9 @@ import org.apache.spark.sql.types._ ...@@ -33,8 +33,9 @@ import org.apache.spark.sql.types._
* It is effective only when the table is a Hive table. * It is effective only when the table is a Hive table.
*/ */
case class DescribeCommand( case class DescribeCommand(
table: LogicalPlan, table: TableIdentifier,
isExtended: Boolean) extends LogicalPlan with logical.Command { isExtended: Boolean)
extends LogicalPlan with logical.Command {
override def children: Seq[LogicalPlan] = Seq.empty override def children: Seq[LogicalPlan] = Seq.empty
......
...@@ -102,18 +102,8 @@ private[hive] trait HiveStrategies { ...@@ -102,18 +102,8 @@ private[hive] trait HiveStrategies {
case class HiveCommandStrategy(context: HiveContext) extends Strategy { case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case describe: DescribeCommand => case describe: DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed ExecutedCommand(
resolvedTable match { DescribeHiveTableCommand(describe.table, describe.output, describe.isExtended)) :: Nil
case t: MetastoreRelation =>
ExecutedCommand(
DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil
case o: LogicalPlan =>
val resultPlan = context.executePlan(o).executedPlan
ExecutedCommand(RunnableDescribeCommand(
resultPlan, describe.output, describe.isExtended)) :: Nil
}
case _ => Nil case _ => Nil
} }
} }
......
...@@ -22,8 +22,10 @@ import scala.collection.JavaConverters._ ...@@ -22,8 +22,10 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{DescribeCommand, RunnableCommand}
import org.apache.spark.sql.hive.MetastoreRelation import org.apache.spark.sql.hive.MetastoreRelation
/** /**
...@@ -31,33 +33,44 @@ import org.apache.spark.sql.hive.MetastoreRelation ...@@ -31,33 +33,44 @@ import org.apache.spark.sql.hive.MetastoreRelation
*/ */
private[hive] private[hive]
case class DescribeHiveTableCommand( case class DescribeHiveTableCommand(
table: MetastoreRelation, tableId: TableIdentifier,
override val output: Seq[Attribute], override val output: Seq[Attribute],
isExtended: Boolean) extends RunnableCommand { isExtended: Boolean) extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = { override def run(sqlContext: SQLContext): Seq[Row] = {
// Trying to mimic the format of Hive's output. But not exactly the same. // There are two modes here:
var results: Seq[(String, String, String)] = Nil // For metastore tables, create an output similar to Hive's.
// For other tables, delegate to DescribeCommand.
val columns: Seq[FieldSchema] = table.hiveQlTable.getCols.asScala
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols.asScala
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
if (partitionColumns.nonEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
partColumnInfo ++
Seq(("# Partition Information", "", "")) ++
Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++
partColumnInfo
}
if (isExtended) { // In the future, we will consolidate the two and simply report what the catalog reports.
results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, "")) sqlContext.sessionState.catalog.lookupRelation(tableId) match {
} case table: MetastoreRelation =>
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil
val columns: Seq[FieldSchema] = table.hiveQlTable.getCols.asScala
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols.asScala
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
if (partitionColumns.nonEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
partColumnInfo ++
Seq(("# Partition Information", "", "")) ++
Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++
partColumnInfo
}
if (isExtended) {
results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
}
results.map { case (name, dataType, comment) =>
Row(name, dataType, comment)
}
results.map { case (name, dataType, comment) => case o: LogicalPlan =>
Row(name, dataType, comment) DescribeCommand(tableId, output, isExtended).run(sqlContext)
} }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment