diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index 8f8138d6ebebce5bbe1ba883d10168acc4532efc..e0ead23d786f9bb70e280e2be193bbf443953fad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -33,6 +33,10 @@ private[sql] case class Partition(values: Row, path: String) private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition]) +private[sql] object PartitionSpec { + val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition]) +} + private[sql] object PartitioningUtils { // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't // depend on Hive. @@ -68,20 +72,37 @@ private[sql] object PartitioningUtils { private[sql] def parsePartitions( paths: Seq[Path], defaultPartitionName: String): PartitionSpec = { - val partitionValues = resolvePartitions(paths.flatMap(parsePartition(_, defaultPartitionName))) - val fields = { - val (PartitionValues(columnNames, literals)) = partitionValues.head - columnNames.zip(literals).map { case (name, Literal(_, dataType)) => - StructField(name, dataType, nullable = true) - } + // First, we need to parse every partition's path and see if we can find partition values. + val pathsWithPartitionValues = paths.flatMap { path => + parsePartition(path, defaultPartitionName).map(path -> _) } - val partitions = partitionValues.zip(paths).map { - case (PartitionValues(_, literals), path) => - Partition(Row(literals.map(_.value): _*), path.toString) - } + if (pathsWithPartitionValues.isEmpty) { + // This dataset is not partitioned. + PartitionSpec.emptySpec + } else { + // This dataset is partitioned. We need to check whether all partitions have the same + // partition columns and resolve potential type conflicts. + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2)) + + // Creates the StructType which represents the partition columns. + val fields = { + val PartitionValues(columnNames, literals) = resolvedPartitionValues.head + columnNames.zip(literals).map { case (name, Literal(_, dataType)) => + // We always assume partition columns are nullable since we've no idea whether null values + // will be appended in the future. + StructField(name, dataType, nullable = true) + } + } + + // Finally, we create `Partition`s based on paths and resolved partition values. + val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { + case (PartitionValues(_, literals), (path, _)) => + Partition(Row.fromSeq(literals.map(_.value)), path.toString) + } - PartitionSpec(StructType(fields), partitions) + PartitionSpec(StructType(fields), partitions) + } } /** @@ -111,7 +132,7 @@ private[sql] object PartitioningUtils { while (!finished) { // Sometimes (e.g., when speculative task is enabled), temporary directories may be left // uncleaned. Here we simply ignore them. - if (chopped.getName == "_temporary") { + if (chopped.getName.toLowerCase == "_temporary") { return None } @@ -121,8 +142,12 @@ private[sql] object PartitioningUtils { finished = maybeColumn.isEmpty || chopped.getParent == null } - val (columnNames, values) = columns.reverse.unzip - Some(PartitionValues(columnNames, values)) + if (columns.isEmpty) { + None + } else { + val (columnNames, values) = columns.reverse.unzip + Some(PartitionValues(columnNames, values)) + } } private def parsePartitionColumn( @@ -156,20 +181,25 @@ private[sql] object PartitioningUtils { private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = { // Column names of all partitions must match val distinctPartitionsColNames = values.map(_.columnNames).distinct - assert(distinctPartitionsColNames.size == 1, { - val list = distinctPartitionsColNames.mkString("\t", "\n", "") - s"Conflicting partition column names detected:\n$list" - }) - - // Resolves possible type conflicts for each column - val columnCount = values.head.columnNames.size - val resolvedValues = (0 until columnCount).map { i => - resolveTypeConflicts(values.map(_.literals(i))) - } - // Fills resolved literals back to each partition - values.zipWithIndex.map { case (d, index) => - d.copy(literals = resolvedValues.map(_(index))) + if (distinctPartitionsColNames.isEmpty) { + Seq.empty + } else { + assert(distinctPartitionsColNames.size == 1, { + val list = distinctPartitionsColNames.mkString("\t", "\n", "") + s"Conflicting partition column names detected:\n$list" + }) + + // Resolves possible type conflicts for each column + val columnCount = values.head.columnNames.size + val resolvedValues = (0 until columnCount).map { i => + resolveTypeConflicts(values.map(_.literals(i))) + } + + // Fills resolved literals back to each partition + values.zipWithIndex.map { case (d, index) => + d.copy(literals = resolvedValues.map(_(index))) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 6a917bf38b13953d16bf246fb67d99f9d2b01c45..fcbac0d4579502f99e93e8c16391b3aa294ddba4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -462,12 +462,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio private def discoverPartitions(): PartitionSpec = { val leafDirs = fileStatusCache.leafDirs.keys.toSeq - - if (leafDirs.nonEmpty) { - PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME) - } else { - PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition]) - } + PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 1927114b8d58f9fd510fb2c3d216a26a35dce12c..907dbb0119b407677c5bdb08be4a303730f2835a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.sources.PartitioningUtils._ -import org.apache.spark.sql.sources.{Partition, PartitionSpec} +import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec} import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.types._ import org.apache.spark.sql.{QueryTest, Row, SQLContext} @@ -66,12 +66,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { assert(message.contains(expected)) } - check("file:///", Some { - PartitionValues( - ArrayBuffer.empty[String], - ArrayBuffer.empty[Literal]) - }) - check("file://path/a=10", Some { PartitionValues( ArrayBuffer("a"), @@ -93,6 +87,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { ArrayBuffer(Literal.create(1.5, FloatType))) }) + check("file:///", None) + check("file:///path/_temporary", None) + check("file:///path/_temporary/c=1.5", None) + check("file:///path/_temporary/path", None) check("file://path/a=10/_temporary/c=1.5", None) check("file://path/a=10/c=1.5/_temporary", None) @@ -124,6 +122,25 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"), Partition(Row(10.5, "hello"), "hdfs://host:9000/path/a=10.5/b=hello")))) + check(Seq( + "hdfs://host:9000/path/_temporary", + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/a=10.5/b=hello", + "hdfs://host:9000/path/a=10.5/_temporary", + "hdfs://host:9000/path/a=10.5/_TeMpOrArY", + "hdfs://host:9000/path/a=10.5/b=hello/_temporary", + "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY", + "hdfs://host:9000/path/_temporary/path", + "hdfs://host:9000/path/a=11/_temporary/path", + "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"), + PartitionSpec( + StructType(Seq( + StructField("a", FloatType), + StructField("b", StringType))), + Seq( + Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"), + Partition(Row(10.5, "hello"), "hdfs://host:9000/path/a=10.5/b=hello")))) + check(Seq( s"hdfs://host:9000/path/a=10/b=20", s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"), @@ -145,6 +162,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { Seq( Partition(Row(10, null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"), Partition(Row(10.5, null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName")))) + + check(Seq( + s"hdfs://host:9000/path1", + s"hdfs://host:9000/path2"), + PartitionSpec.emptySpec) } test("read partitioned table - normal case") { @@ -334,4 +356,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { } } } + + test("SPARK-7749 Non-partitioned table should have empty partition spec") { + withTempPath { dir => + (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) + val queryExecution = read.parquet(dir.getCanonicalPath).queryExecution + queryExecution.analyzed.collectFirst { + case LogicalRelation(relation: ParquetRelation2) => + assert(relation.partitionSpec === PartitionSpec.emptySpec) + }.getOrElse { + fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution") + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 05d99983b6a63e85a5f63f33d18e9e9ef073fc5f..1da990bc959ba29c5f2a879159b2111db3bab857 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -22,6 +22,7 @@ import java.io.File import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive._ @@ -29,7 +30,7 @@ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode} +import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode} import org.apache.spark.util.Utils // The data where the partitioning key exists only in the directory structure. @@ -385,6 +386,54 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { sql("DROP TABLE ms_convert") } + def collectParquetRelation(df: DataFrame): ParquetRelation2 = { + val plan = df.queryExecution.analyzed + plan.collectFirst { + case LogicalRelation(r: ParquetRelation2) => r + }.getOrElse { + fail(s"Expecting a ParquetRelation2, but got:\n$plan") + } + } + + test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") { + sql( + s"""CREATE TABLE nonPartitioned ( + | key INT, + | value STRING + |) + |STORED AS PARQUET + """.stripMargin) + + // First lookup fills the cache + val r1 = collectParquetRelation(table("nonPartitioned")) + // Second lookup should reuse the cache + val r2 = collectParquetRelation(table("nonPartitioned")) + // They should be the same instance + assert(r1 eq r2) + + sql("DROP TABLE nonPartitioned") + } + + test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") { + sql( + s"""CREATE TABLE partitioned ( + | key INT, + | value STRING + |) + |PARTITIONED BY (part INT) + |STORED AS PARQUET + """.stripMargin) + + // First lookup fills the cache + val r1 = collectParquetRelation(table("partitioned")) + // Second lookup should reuse the cache + val r2 = collectParquetRelation(table("partitioned")) + // They should be the same instance + assert(r1 eq r2) + + sql("DROP TABLE partitioned") + } + test("Caching converted data source Parquet Relations") { def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = { // Converted test_parquet should be cached.