Skip to content
Snippets Groups Projects
Commit c6610a99 authored by Takeshi Yamamuro's avatar Takeshi Yamamuro Committed by gatorsmile
Browse files

[SPARK-22122][SQL] Use analyzed logical plans to count input rows in TPCDSQueryBenchmark

## What changes were proposed in this pull request?
Since the current code ignores WITH clauses to check input relations in TPCDS queries, this leads to inaccurate per-row processing time for benchmark results. For example, in `q2`, this fix could catch all the input relations: `web_sales`, `date_dim`, and `catalog_sales` (the current code catches `date_dim` only). The one-third of the TPCDS queries uses WITH clauses, so I think it is worth fixing this.

## How was this patch tested?
Manually checked.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19344 from maropu/RespectWithInTPCDSBench.
parent 530fe683
No related branches found
No related tags found
No related merge requests found
......@@ -20,11 +20,10 @@ package org.apache.spark.sql.execution.benchmark
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.util.Benchmark
/**
......@@ -66,24 +65,15 @@ object TPCDSQueryBenchmark extends Logging {
classLoader = Thread.currentThread().getContextClassLoader)
// This is an indirect hack to estimate the size of each query's input by traversing the
// logical plan and adding up the sizes of all tables that appear in the plan. Note that this
// currently doesn't take WITH subqueries into account which might lead to fairly inaccurate
// per-row processing time for those cases.
// logical plan and adding up the sizes of all tables that appear in the plan.
val queryRelations = scala.collection.mutable.HashSet[String]()
spark.sql(queryString).queryExecution.logical.map {
case UnresolvedRelation(t: TableIdentifier) =>
queryRelations.add(t.table)
case lp: LogicalPlan =>
lp.expressions.foreach { _ foreach {
case subquery: SubqueryExpression =>
subquery.plan.foreach {
case UnresolvedRelation(t: TableIdentifier) =>
queryRelations.add(t.table)
case _ =>
}
case _ =>
}
}
spark.sql(queryString).queryExecution.analyzed.foreach {
case SubqueryAlias(alias, _: LogicalRelation) =>
queryRelations.add(alias)
case LogicalRelation(_, _, Some(catalogTable), _) =>
queryRelations.add(catalogTable.identifier.table)
case HiveTableRelation(tableMeta, _, _) =>
queryRelations.add(tableMeta.identifier.table)
case _ =>
}
val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment