diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index d1f0cdab55f667147142097ed4ecc6c1faaf9f60..8f8138d6ebebce5bbe1ba883d10168acc4532efc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -23,8 +23,7 @@ import java.math.{BigDecimal => JBigDecimal} import scala.collection.mutable.ArrayBuffer import scala.util.Try -import com.google.common.cache.{CacheBuilder, Cache} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} @@ -69,7 +68,7 @@ private[sql] object PartitioningUtils { private[sql] def parsePartitions( paths: Seq[Path], defaultPartitionName: String): PartitionSpec = { - val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName))) + val partitionValues = resolvePartitions(paths.flatMap(parsePartition(_, defaultPartitionName))) val fields = { val (PartitionValues(columnNames, literals)) = partitionValues.head columnNames.zip(literals).map { case (name, Literal(_, dataType)) => @@ -103,13 +102,19 @@ private[sql] object PartitioningUtils { */ private[sql] def parsePartition( path: Path, - defaultPartitionName: String): PartitionValues = { + defaultPartitionName: String): Option[PartitionValues] = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null var chopped = path while (!finished) { + // Sometimes (e.g., when speculative task is enabled), temporary directories may be left + // uncleaned. Here we simply ignore them. + if (chopped.getName == "_temporary") { + return None + } + val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName) maybeColumn.foreach(columns += _) chopped = chopped.getParent @@ -117,7 +122,7 @@ private[sql] object PartitioningUtils { } val (columnNames, values) = columns.reverse.unzip - PartitionValues(columnNames, values) + Some(PartitionValues(columnNames, values)) } private def parsePartitionColumn( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 8079c460713da5339d96c5493217c7ec881ca7e1..1927114b8d58f9fd510fb2c3d216a26a35dce12c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -54,44 +54,47 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { } test("parse partition") { - def check(path: String, expected: PartitionValues): Unit = { + def check(path: String, expected: Option[PartitionValues]): Unit = { assert(expected === parsePartition(new Path(path), defaultPartitionName)) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), defaultPartitionName) + parsePartition(new Path(path), defaultPartitionName).get }.getMessage assert(message.contains(expected)) } - check( - "file:///", + check("file:///", Some { PartitionValues( ArrayBuffer.empty[String], - ArrayBuffer.empty[Literal])) + ArrayBuffer.empty[Literal]) + }) - check( - "file://path/a=10", + check("file://path/a=10", Some { PartitionValues( ArrayBuffer("a"), - ArrayBuffer(Literal.create(10, IntegerType)))) + ArrayBuffer(Literal.create(10, IntegerType))) + }) - check( - "file://path/a=10/b=hello/c=1.5", + check("file://path/a=10/b=hello/c=1.5", Some { PartitionValues( ArrayBuffer("a", "b", "c"), ArrayBuffer( Literal.create(10, IntegerType), Literal.create("hello", StringType), - Literal.create(1.5, FloatType)))) + Literal.create(1.5, FloatType))) + }) - check( - "file://path/a=10/b_hello/c=1.5", + check("file://path/a=10/b_hello/c=1.5", Some { PartitionValues( ArrayBuffer("c"), - ArrayBuffer(Literal.create(1.5, FloatType)))) + ArrayBuffer(Literal.create(1.5, FloatType))) + }) + + check("file://path/a=10/_temporary/c=1.5", None) + check("file://path/a=10/c=1.5/_temporary", None) checkThrows[AssertionError]("file://path/=10", "Empty partition column name") checkThrows[AssertionError]("file://path/a=", "Empty partition column value")