Skip to content
Snippets Groups Projects
Commit 8b5b2e27 authored by lianhuiwang's avatar lianhuiwang Committed by Wenchen Fan
Browse files

[SPARK-20986][SQL] Reset table's statistics after PruneFileSourcePartitions rule.

## What changes were proposed in this pull request?
After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed.

## How was this patch tested?
add unit test.

Author: lianhuiwang <lianhuiwang09@gmail.com>

Closes #18205 from lianhuiwang/SPARK-20986.
parent 9eb09524
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
......@@ -59,8 +60,11 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
fsRelation.copy(location = prunedFileIndex)(sparkSession)
val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
// Change table stats based on the sizeInBytes of pruned files
val withStats = logicalRelation.catalogTable.map(_.copy(
stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes)))))
val prunedLogicalRelation = logicalRelation.copy(
relation = prunedFsRelation, catalogTable = withStats)
// Keep partition-pruning predicates so that they are visible in physical planning
val filterExpression = filters.reduceLeft(And)
val filter = Filter(filterExpression, prunedLogicalRelation)
......
......@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
......@@ -66,4 +67,28 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
}
}
}
test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") {
withTable("tbl") {
spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl")
sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS")
val tableStats = spark.sessionState.catalog.getTableMetadata(TableIdentifier("tbl")).stats
assert(tableStats.isDefined && tableStats.get.sizeInBytes > 0, "tableStats is lost")
val df = sql("SELECT * FROM tbl WHERE p = 1")
val sizes1 = df.queryExecution.analyzed.collect {
case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes
}
assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizes1(0) == tableStats.get.sizeInBytes)
val relations = df.queryExecution.optimizedPlan.collect {
case relation: LogicalRelation => relation
}
assert(relations.size === 1, s"Size wrong for:\n ${df.queryExecution}")
val size2 = relations(0).computeStats(conf).sizeInBytes
assert(size2 == relations(0).catalogTable.get.stats.get.sizeInBytes)
assert(size2 < tableStats.get.sizeInBytes)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment