diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index d4447ca32d5a0cffb0a849463f19e19700d2d24c..6784c3ae1d7e71abc0d093f06d9cdac9a5a60da5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -264,7 +264,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT expressions.flatMap(_.collect {case e: SubqueryExpression => e.plan.asInstanceOf[PlanType]}) } - override def innerChildren: Seq[PlanType] = subqueries + override protected def innerChildren: Seq[QueryPlan[_]] = subqueries /** * Canonicalized copy of this query plan. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index d87e6c76ed734c8f3e041717af6edb68145591b8..3ebd815dce32c5a2e41c003541445b150a19c9bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -424,9 +424,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { */ protected def stringArgs: Iterator[Any] = productIterator + private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]] + /** Returns a string representing the arguments to this node, minus any children */ def argString: String = productIterator.flatMap { - case tn: TreeNode[_] if containsChild(tn) => Nil + case tn: TreeNode[_] if allChildren.contains(tn) => Nil + case Some(tn: TreeNode[_]) if allChildren.contains(tn) => Nil case tn: TreeNode[_] => s"${tn.simpleString}" :: Nil case seq: Seq[BaseType] if seq.toSet.subsetOf(children.toSet) => Nil case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil @@ -467,9 +470,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } /** - * All the nodes that are parts of this node, this is used by subquries. + * All the nodes that should be shown as a inner nested tree of this node. + * For example, this can be used to show sub-queries. */ - protected def innerChildren: Seq[BaseType] = Nil + protected def innerChildren: Seq[TreeNode[_]] = Seq.empty /** * Appends the string represent of this node and its children to the given StringBuilder. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index ba61940b3d5a4c268629172257f4715ece3810d4..7ccc9de9db233244b9dd802715be2e0c9e686c62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -70,6 +71,8 @@ private[sql] case class InMemoryRelation( private[sql] var _batchStats: ListAccumulator[InternalRow] = null) extends logical.LeafNode with MultiInstanceRelation { + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) + override def producedAttributes: AttributeSet = outputSet private[sql] val batchStats: ListAccumulator[InternalRow] = @@ -222,6 +225,8 @@ private[sql] case class InMemoryTableScanExec( @transient relation: InMemoryRelation) extends LeafExecNode { + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren + private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index b1290a4759a256d17021ae9922ac318e29308e0c..3e5eed2efa76b0e41932ea6f37d592da5e2153cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -19,15 +19,19 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - case class CacheTableCommand( tableName: String, plan: Option[LogicalPlan], isLazy: Boolean) extends RunnableCommand { + override protected def innerChildren: Seq[QueryPlan[_]] = { + plan.toSeq + } + override def run(sparkSession: SparkSession): Seq[Row] = { plan.foreach { logicalPlan => Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 642a95a99262d6f227dcd9e5d2ef35ab9422b93d..38bb6e412f753fe08fc97e5353bcaffa55d18502 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan @@ -57,6 +58,8 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) } + override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil + override def output: Seq[Attribute] = cmd.output override def children: Seq[SparkPlan] = Nil @@ -68,11 +71,8 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult, 1) } - - override def argString: String = cmd.toString } - /** * An explain command for users to see how a command will be executed. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 9956c5b09236dc8e4f00479533b3af80cff9a4cd..66753fa7f27bc84ca461d031a19c29087f20dda3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.HiveSerDe @@ -138,6 +139,8 @@ case class CreateDataSourceTableAsSelectCommand( query: LogicalPlan) extends RunnableCommand { + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override def run(sparkSession: SparkSession): Seq[Row] = { // Since we are saving metadata to metastore, we need to check if metastore supports // the table name and database name we have for this query. MetaStoreUtils.validateName diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 20c02786ecc5ccf3b1804846927976137ef79751..b56c200e9e98be8f416a59483761b04366b0ed0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} @@ -50,6 +51,8 @@ case class CreateViewCommand( isTemporary: Boolean) extends RunnableCommand { + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) + // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is // different from Hive and may not work for some cases like create view on self join. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 25b901f2db8d0ce05dbb020cd57370da00a0002e..8549ae96e2f39c5afb9c13760a372b5139708d29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.sources.InsertableRelation @@ -32,6 +33,8 @@ private[sql] case class InsertIntoDataSourceCommand( overwrite: Boolean) extends RunnableCommand { + override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) + override def run(sparkSession: SparkSession): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val data = Dataset.ofRows(sparkSession, query)