Skip to content
Snippets Groups Projects
Commit 8730fbb4 authored by Cheng Lian's avatar Cheng Lian Committed by Yin Huai
Browse files

[SPARK-7749] [SQL] Fixes partition discovery for non-partitioned tables

When no partition columns can be found, we should have an empty `PartitionSpec`, rather than a `PartitionSpec` with empty partition columns.

This PR together with #6285 should fix SPARK-7749.

Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #6287 from liancheng/spark-7749 and squashes the following commits:

a799ff3 [Cheng Lian] Adds test cases for SPARK-7749
c4949be [Cheng Lian] Minor refactoring, and tolerant _TEMPORARY directory name
5aa87ea [Yin Huai] Make parsePartitions more robust.
fc56656 [Cheng Lian] Returns empty PartitionSpec if no partition columns can be inferred
19ae41e [Cheng Lian] Don't list base directory as leaf directory
parent 13348e21
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,10 @@ private[sql] case class Partition(values: Row, path: String)
private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
private[sql] object PartitionSpec {
val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
}
private[sql] object PartitioningUtils {
// This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
// depend on Hive.
......@@ -68,20 +72,37 @@ private[sql] object PartitioningUtils {
private[sql] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String): PartitionSpec = {
val partitionValues = resolvePartitions(paths.flatMap(parsePartition(_, defaultPartitionName)))
val fields = {
val (PartitionValues(columnNames, literals)) = partitionValues.head
columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
StructField(name, dataType, nullable = true)
}
// First, we need to parse every partition's path and see if we can find partition values.
val pathsWithPartitionValues = paths.flatMap { path =>
parsePartition(path, defaultPartitionName).map(path -> _)
}
val partitions = partitionValues.zip(paths).map {
case (PartitionValues(_, literals), path) =>
Partition(Row(literals.map(_.value): _*), path.toString)
}
if (pathsWithPartitionValues.isEmpty) {
// This dataset is not partitioned.
PartitionSpec.emptySpec
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2))
// Creates the StructType which represents the partition columns.
val fields = {
val PartitionValues(columnNames, literals) = resolvedPartitionValues.head
columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
// We always assume partition columns are nullable since we've no idea whether null values
// will be appended in the future.
StructField(name, dataType, nullable = true)
}
}
// Finally, we create `Partition`s based on paths and resolved partition values.
val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map {
case (PartitionValues(_, literals), (path, _)) =>
Partition(Row.fromSeq(literals.map(_.value)), path.toString)
}
PartitionSpec(StructType(fields), partitions)
PartitionSpec(StructType(fields), partitions)
}
}
/**
......@@ -111,7 +132,7 @@ private[sql] object PartitioningUtils {
while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
// uncleaned. Here we simply ignore them.
if (chopped.getName == "_temporary") {
if (chopped.getName.toLowerCase == "_temporary") {
return None
}
......@@ -121,8 +142,12 @@ private[sql] object PartitioningUtils {
finished = maybeColumn.isEmpty || chopped.getParent == null
}
val (columnNames, values) = columns.reverse.unzip
Some(PartitionValues(columnNames, values))
if (columns.isEmpty) {
None
} else {
val (columnNames, values) = columns.reverse.unzip
Some(PartitionValues(columnNames, values))
}
}
private def parsePartitionColumn(
......@@ -156,20 +181,25 @@ private[sql] object PartitioningUtils {
private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
// Column names of all partitions must match
val distinctPartitionsColNames = values.map(_.columnNames).distinct
assert(distinctPartitionsColNames.size == 1, {
val list = distinctPartitionsColNames.mkString("\t", "\n", "")
s"Conflicting partition column names detected:\n$list"
})
// Resolves possible type conflicts for each column
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
}
// Fills resolved literals back to each partition
values.zipWithIndex.map { case (d, index) =>
d.copy(literals = resolvedValues.map(_(index)))
if (distinctPartitionsColNames.isEmpty) {
Seq.empty
} else {
assert(distinctPartitionsColNames.size == 1, {
val list = distinctPartitionsColNames.mkString("\t", "\n", "")
s"Conflicting partition column names detected:\n$list"
})
// Resolves possible type conflicts for each column
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
}
// Fills resolved literals back to each partition
values.zipWithIndex.map { case (d, index) =>
d.copy(literals = resolvedValues.map(_(index)))
}
}
}
......
......@@ -462,12 +462,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
private def discoverPartitions(): PartitionSpec = {
val leafDirs = fileStatusCache.leafDirs.keys.toSeq
if (leafDirs.nonEmpty) {
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
} else {
PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
}
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
}
/**
......
......@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.sources.PartitioningUtils._
import org.apache.spark.sql.sources.{Partition, PartitionSpec}
import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLContext}
......@@ -66,12 +66,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
assert(message.contains(expected))
}
check("file:///", Some {
PartitionValues(
ArrayBuffer.empty[String],
ArrayBuffer.empty[Literal])
})
check("file://path/a=10", Some {
PartitionValues(
ArrayBuffer("a"),
......@@ -93,6 +87,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
ArrayBuffer(Literal.create(1.5, FloatType)))
})
check("file:///", None)
check("file:///path/_temporary", None)
check("file:///path/_temporary/c=1.5", None)
check("file:///path/_temporary/path", None)
check("file://path/a=10/_temporary/c=1.5", None)
check("file://path/a=10/c=1.5/_temporary", None)
......@@ -124,6 +122,25 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"),
Partition(Row(10.5, "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))
check(Seq(
"hdfs://host:9000/path/_temporary",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/a=10.5/b=hello",
"hdfs://host:9000/path/a=10.5/_temporary",
"hdfs://host:9000/path/a=10.5/_TeMpOrArY",
"hdfs://host:9000/path/a=10.5/b=hello/_temporary",
"hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY",
"hdfs://host:9000/path/_temporary/path",
"hdfs://host:9000/path/a=11/_temporary/path",
"hdfs://host:9000/path/a=10.5/b=world/_temporary/path"),
PartitionSpec(
StructType(Seq(
StructField("a", FloatType),
StructField("b", StringType))),
Seq(
Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"),
Partition(Row(10.5, "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))
check(Seq(
s"hdfs://host:9000/path/a=10/b=20",
s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"),
......@@ -145,6 +162,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
Seq(
Partition(Row(10, null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
Partition(Row(10.5, null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))
check(Seq(
s"hdfs://host:9000/path1",
s"hdfs://host:9000/path2"),
PartitionSpec.emptySpec)
}
test("read partitioned table - normal case") {
......@@ -334,4 +356,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
}
}
}
test("SPARK-7749 Non-partitioned table should have empty partition spec") {
withTempPath { dir =>
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
val queryExecution = read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
case LogicalRelation(relation: ParquetRelation2) =>
assert(relation.partitionSpec === PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
}
}
}
}
......@@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
......@@ -29,7 +30,7 @@ import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.
......@@ -385,6 +386,54 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
sql("DROP TABLE ms_convert")
}
def collectParquetRelation(df: DataFrame): ParquetRelation2 = {
val plan = df.queryExecution.analyzed
plan.collectFirst {
case LogicalRelation(r: ParquetRelation2) => r
}.getOrElse {
fail(s"Expecting a ParquetRelation2, but got:\n$plan")
}
}
test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation") {
sql(
s"""CREATE TABLE nonPartitioned (
| key INT,
| value STRING
|)
|STORED AS PARQUET
""".stripMargin)
// First lookup fills the cache
val r1 = collectParquetRelation(table("nonPartitioned"))
// Second lookup should reuse the cache
val r2 = collectParquetRelation(table("nonPartitioned"))
// They should be the same instance
assert(r1 eq r2)
sql("DROP TABLE nonPartitioned")
}
test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation") {
sql(
s"""CREATE TABLE partitioned (
| key INT,
| value STRING
|)
|PARTITIONED BY (part INT)
|STORED AS PARQUET
""".stripMargin)
// First lookup fills the cache
val r1 = collectParquetRelation(table("partitioned"))
// Second lookup should reuse the cache
val r2 = collectParquetRelation(table("partitioned"))
// They should be the same instance
assert(r1 eq r2)
sql("DROP TABLE partitioned")
}
test("Caching converted data source Parquet Relations") {
def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment