diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 1a57a7707caa14a6e130f4d54a0f2cb7bc0483f7..a97ed701c4207ad25aa59ff115b88466c54f198c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -102,8 +102,8 @@ case class CatalogTablePartition( * Given the partition schema, returns a row with that schema holding the partition values. */ def toRow(partitionSchema: StructType): InternalRow = { - InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) => - Cast(Literal(spec(name)), dataType).eval() + InternalRow.fromSeq(partitionSchema.map { field => + Cast(Literal(spec(field.name)), field.dataType).eval() }) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 29121a47d92d1f9f81328a36cc56c8a861caaf3f..8689017c3ed75700504c282c9b2996cba3f23d4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -59,7 +59,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileCatalog)(sparkSession) - val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) + val prunedLogicalRelation = logicalRelation.copy( + relation = prunedFsRelation, + expectedOutputAttributes = Some(logicalRelation.output)) // Keep partition-pruning predicates so that they are visible in physical planning val filterExpression = filters.reduceLeft(And) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala index f65e74de87a574ce2f7ee9fe6508ae7f2ed0e887..15523437a3404b45a450031b89073556f051c8a6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.QueryTest class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { test("table name with schema") { @@ -78,7 +79,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt } test("lazy partition pruning reads only necessary partition data") { - withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "true") { + withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") { withTable("test") { withTempDir { dir => setupPartitionedTable("test", dir) @@ -114,7 +115,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt } test("all partitions read and cached when filesource partition pruning is off") { - withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") { + withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") { withTable("test") { withTempDir { dir => setupPartitionedTable("test", dir) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index 2ca1cd4c07fdb574753ba825242808b5f99dddb8..d290fe9962db26c6a441b43198f34c7cc1aac863 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils /** @@ -62,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi def testCaching(pruningEnabled: Boolean): Unit = { test(s"partitioned table is cached when partition pruning is $pruningEnabled") { - withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> pruningEnabled.toString) { + withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString) { withTable("test") { withTempDir { dir => spark.range(5).selectExpr("id", "id as f1", "id as f2").write diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..346ea0ca4367ec78895022fe66f85795cc58f420 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType + +class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("PruneFileSourcePartitions", Once, PruneFileSourcePartitions) :: Nil + } + + test("PruneFileSourcePartitions should not change the output of LogicalRelation") { + withTable("test") { + withTempDir { dir => + sql( + s""" + |CREATE EXTERNAL TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS parquet + |LOCATION '${dir.getAbsolutePath}'""".stripMargin) + + val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") + val tableFileCatalog = new TableFileCatalog( + spark, + tableMeta.database, + tableMeta.identifier.table, + Some(tableMeta.partitionSchema), + 0) + + val dataSchema = StructType(tableMeta.schema.filterNot { f => + tableMeta.partitionColumnNames.contains(f.name) + }) + val relation = HadoopFsRelation( + location = tableFileCatalog, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) + + val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta)) + val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze + + val optimized = Optimize.execute(query) + assert(optimized.missingInput.isEmpty) + } + } + } +}