From c6610a997f69148a1f1bbf69360e8f39e24cb70a Mon Sep 17 00:00:00 2001
From: Takeshi Yamamuro <yamamuro@apache.org>
Date: Fri, 29 Sep 2017 21:36:52 -0700
Subject: [PATCH] [SPARK-22122][SQL] Use analyzed logical plans to count input
 rows in TPCDSQueryBenchmark

## What changes were proposed in this pull request?
Since the current code ignores WITH clauses to check input relations in TPCDS queries, this leads to inaccurate per-row processing time for benchmark results. For example, in `q2`, this fix could catch all the input relations: `web_sales`, `date_dim`, and `catalog_sales` (the current code catches `date_dim` only). The one-third of the TPCDS queries uses WITH clauses, so I think it is worth fixing this.

## How was this patch tested?
Manually checked.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19344 from maropu/RespectWithInTPCDSBench.
---
 .../benchmark/TPCDSQueryBenchmark.scala       | 32 +++++++------------
 1 file changed, 11 insertions(+), 21 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index 99c6df7389..69247d7f4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -20,11 +20,10 @@ package org.apache.spark.sql.execution.benchmark
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.util.Benchmark
 
 /**
@@ -66,24 +65,15 @@ object TPCDSQueryBenchmark extends Logging {
         classLoader = Thread.currentThread().getContextClassLoader)
 
       // This is an indirect hack to estimate the size of each query's input by traversing the
-      // logical plan and adding up the sizes of all tables that appear in the plan. Note that this
-      // currently doesn't take WITH subqueries into account which might lead to fairly inaccurate
-      // per-row processing time for those cases.
+      // logical plan and adding up the sizes of all tables that appear in the plan.
       val queryRelations = scala.collection.mutable.HashSet[String]()
-      spark.sql(queryString).queryExecution.logical.map {
-        case UnresolvedRelation(t: TableIdentifier) =>
-          queryRelations.add(t.table)
-        case lp: LogicalPlan =>
-          lp.expressions.foreach { _ foreach {
-            case subquery: SubqueryExpression =>
-              subquery.plan.foreach {
-                case UnresolvedRelation(t: TableIdentifier) =>
-                  queryRelations.add(t.table)
-                case _ =>
-              }
-            case _ =>
-          }
-        }
+      spark.sql(queryString).queryExecution.analyzed.foreach {
+        case SubqueryAlias(alias, _: LogicalRelation) =>
+          queryRelations.add(alias)
+        case LogicalRelation(_, _, Some(catalogTable), _) =>
+          queryRelations.add(catalogTable.identifier.table)
+        case HiveTableRelation(tableMeta, _, _) =>
+          queryRelations.add(tableMeta.identifier.table)
         case _ =>
       }
       val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum
-- 
GitLab