From 010a1c278037130a69dcc79427d2b0380a2c82d8 Mon Sep 17 00:00:00 2001 From: Cheng Lian <lian@databricks.com> Date: Mon, 18 May 2015 11:59:44 -0700 Subject: [PATCH] [SPARK-7570] [SQL] Ignores _temporary during partition discovery <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6091) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #6091 from liancheng/spark-7570 and squashes the following commits: 8ff07e8 [Cheng Lian] Ignores _temporary during partition discovery --- .../spark/sql/sources/PartitioningUtils.scala | 15 ++++++--- .../ParquetPartitionDiscoverySuite.scala | 31 ++++++++++--------- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index d1f0cdab55..8f8138d6eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -23,8 +23,7 @@ import java.math.{BigDecimal => JBigDecimal} import scala.collection.mutable.ArrayBuffer import scala.util.Try -import com.google.common.cache.{CacheBuilder, Cache} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} @@ -69,7 +68,7 @@ private[sql] object PartitioningUtils { private[sql] def parsePartitions( paths: Seq[Path], defaultPartitionName: String): PartitionSpec = { - val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName))) + val partitionValues = resolvePartitions(paths.flatMap(parsePartition(_, defaultPartitionName))) val fields = { val (PartitionValues(columnNames, literals)) = partitionValues.head columnNames.zip(literals).map { case (name, Literal(_, dataType)) => @@ -103,13 +102,19 @@ private[sql] object PartitioningUtils { */ private[sql] def parsePartition( path: Path, - defaultPartitionName: String): PartitionValues = { + defaultPartitionName: String): Option[PartitionValues] = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null var chopped = path while (!finished) { + // Sometimes (e.g., when speculative task is enabled), temporary directories may be left + // uncleaned. Here we simply ignore them. + if (chopped.getName == "_temporary") { + return None + } + val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName) maybeColumn.foreach(columns += _) chopped = chopped.getParent @@ -117,7 +122,7 @@ private[sql] object PartitioningUtils { } val (columnNames, values) = columns.reverse.unzip - PartitionValues(columnNames, values) + Some(PartitionValues(columnNames, values)) } private def parsePartitionColumn( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 8079c46071..1927114b8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -54,44 +54,47 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { } test("parse partition") { - def check(path: String, expected: PartitionValues): Unit = { + def check(path: String, expected: Option[PartitionValues]): Unit = { assert(expected === parsePartition(new Path(path), defaultPartitionName)) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), defaultPartitionName) + parsePartition(new Path(path), defaultPartitionName).get }.getMessage assert(message.contains(expected)) } - check( - "file:///", + check("file:///", Some { PartitionValues( ArrayBuffer.empty[String], - ArrayBuffer.empty[Literal])) + ArrayBuffer.empty[Literal]) + }) - check( - "file://path/a=10", + check("file://path/a=10", Some { PartitionValues( ArrayBuffer("a"), - ArrayBuffer(Literal.create(10, IntegerType)))) + ArrayBuffer(Literal.create(10, IntegerType))) + }) - check( - "file://path/a=10/b=hello/c=1.5", + check("file://path/a=10/b=hello/c=1.5", Some { PartitionValues( ArrayBuffer("a", "b", "c"), ArrayBuffer( Literal.create(10, IntegerType), Literal.create("hello", StringType), - Literal.create(1.5, FloatType)))) + Literal.create(1.5, FloatType))) + }) - check( - "file://path/a=10/b_hello/c=1.5", + check("file://path/a=10/b_hello/c=1.5", Some { PartitionValues( ArrayBuffer("c"), - ArrayBuffer(Literal.create(1.5, FloatType)))) + ArrayBuffer(Literal.create(1.5, FloatType))) + }) + + check("file://path/a=10/_temporary/c=1.5", None) + check("file://path/a=10/c=1.5/_temporary", None) checkThrows[AssertionError]("file://path/=10", "Empty partition column name") checkThrows[AssertionError]("file://path/a=", "Empty partition column value") -- GitLab