diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala new file mode 100644 index 0000000000000000000000000000000000000000..b1442eec164d8e39628fd85dfe508b7cb01606ce --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.Shell + +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec + +object ExternalCatalogUtils { + // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't + // depend on Hive. + val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils). + ////////////////////////////////////////////////////////////////////////////////////////////////// + + val charToEscape = { + val bitSet = new java.util.BitSet(128) + + /** + * ASCII 01-1F are HTTP control characters that need to be escaped. + * \u000A and \u000D are \n and \r, respectively. + */ + val clist = Array( + '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', + '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013', + '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C', + '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', + '{', '[', ']', '^') + + clist.foreach(bitSet.set(_)) + + if (Shell.WINDOWS) { + Array(' ', '<', '>', '|').foreach(bitSet.set(_)) + } + + bitSet + } + + def needsEscaping(c: Char): Boolean = { + c >= 0 && c < charToEscape.size() && charToEscape.get(c) + } + + def escapePathName(path: String): String = { + val builder = new StringBuilder() + path.foreach { c => + if (needsEscaping(c)) { + builder.append('%') + builder.append(f"${c.asInstanceOf[Int]}%02X") + } else { + builder.append(c) + } + } + + builder.toString() + } + + + def unescapePathName(path: String): String = { + val sb = new StringBuilder + var i = 0 + + while (i < path.length) { + val c = path.charAt(i) + if (c == '%' && i + 2 < path.length) { + val code: Int = try { + Integer.parseInt(path.substring(i + 1, i + 3), 16) + } catch { + case _: Exception => -1 + } + if (code >= 0) { + sb.append(code.asInstanceOf[Char]) + i += 3 + } else { + sb.append(c) + i += 1 + } + } else { + sb.append(c) + i += 1 + } + } + + sb.toString() + } + + def generatePartitionPath( + spec: TablePartitionSpec, + partitionColumnNames: Seq[String], + tablePath: Path): Path = { + val partitionPathStrings = partitionColumnNames.map { col => + val partitionValue = spec(col) + val partitionString = if (partitionValue == null) { + DEFAULT_PARTITION_NAME + } else { + escapePathName(partitionValue) + } + escapePathName(col) + "=" + partitionString + } + partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) => + new Path(totalPath, nextPartPath) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 20db81e6f9060ba1fa87e36d9fcaffbc35bd62cd..a3ffeaa63f690ba833beb1fcb093bb20f22ea07c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -231,7 +231,7 @@ class InMemoryCatalog( assert(tableMeta.storage.locationUri.isDefined, "Managed table should always have table location, as we will assign a default location " + "to it if it doesn't have one.") - val dir = new Path(tableMeta.storage.locationUri.get) + val dir = new Path(tableMeta.location) try { val fs = dir.getFileSystem(hadoopConfig) fs.delete(dir, true) @@ -259,7 +259,7 @@ class InMemoryCatalog( assert(oldDesc.table.storage.locationUri.isDefined, "Managed table should always have table location, as we will assign a default location " + "to it if it doesn't have one.") - val oldDir = new Path(oldDesc.table.storage.locationUri.get) + val oldDir = new Path(oldDesc.table.location) val newDir = new Path(catalog(db).db.locationUri, newName) try { val fs = oldDir.getFileSystem(hadoopConfig) @@ -355,25 +355,28 @@ class InMemoryCatalog( } } - val tableDir = new Path(catalog(db).db.locationUri, table) - val partitionColumnNames = getTable(db, table).partitionColumnNames + val tableMeta = getTable(db, table) + val partitionColumnNames = tableMeta.partitionColumnNames + val tablePath = new Path(tableMeta.location) // TODO: we should follow hive to roll back if one partition path failed to create. parts.foreach { p => - // If location is set, the partition is using an external partition location and we don't - // need to handle its directory. - if (p.storage.locationUri.isEmpty) { - val partitionPath = partitionColumnNames.flatMap { col => - p.spec.get(col).map(col + "=" + _) - }.mkString("/") - try { - val fs = tableDir.getFileSystem(hadoopConfig) - fs.mkdirs(new Path(tableDir, partitionPath)) - } catch { - case e: IOException => - throw new SparkException(s"Unable to create partition path $partitionPath", e) + val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse { + ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath) + } + + try { + val fs = tablePath.getFileSystem(hadoopConfig) + if (!fs.exists(partitionPath)) { + fs.mkdirs(partitionPath) } + } catch { + case e: IOException => + throw new SparkException(s"Unable to create partition path $partitionPath", e) } - existingParts.put(p.spec, p) + + existingParts.put( + p.spec, + p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString)))) } } @@ -392,19 +395,15 @@ class InMemoryCatalog( } } - val tableDir = new Path(catalog(db).db.locationUri, table) - val partitionColumnNames = getTable(db, table).partitionColumnNames - // TODO: we should follow hive to roll back if one partition path failed to delete. + val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED + // TODO: we should follow hive to roll back if one partition path failed to delete, and support + // partial partition spec. partSpecs.foreach { p => - // If location is set, the partition is using an external partition location and we don't - // need to handle its directory. - if (existingParts.contains(p) && existingParts(p).storage.locationUri.isEmpty) { - val partitionPath = partitionColumnNames.flatMap { col => - p.get(col).map(col + "=" + _) - }.mkString("/") + if (existingParts.contains(p) && shouldRemovePartitionLocation) { + val partitionPath = new Path(existingParts(p).location) try { - val fs = tableDir.getFileSystem(hadoopConfig) - fs.delete(new Path(tableDir, partitionPath), true) + val fs = partitionPath.getFileSystem(hadoopConfig) + fs.delete(partitionPath, true) } catch { case e: IOException => throw new SparkException(s"Unable to delete partition path $partitionPath", e) @@ -423,33 +422,34 @@ class InMemoryCatalog( requirePartitionsExist(db, table, specs) requirePartitionsNotExist(db, table, newSpecs) - val tableDir = new Path(catalog(db).db.locationUri, table) - val partitionColumnNames = getTable(db, table).partitionColumnNames + val tableMeta = getTable(db, table) + val partitionColumnNames = tableMeta.partitionColumnNames + val tablePath = new Path(tableMeta.location) + val shouldUpdatePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED + val existingParts = catalog(db).tables(table).partitions // TODO: we should follow hive to roll back if one partition path failed to rename. specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => - val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec) - val existingParts = catalog(db).tables(table).partitions - - // If location is set, the partition is using an external partition location and we don't - // need to handle its directory. - if (newPart.storage.locationUri.isEmpty) { - val oldPath = partitionColumnNames.flatMap { col => - oldSpec.get(col).map(col + "=" + _) - }.mkString("/") - val newPath = partitionColumnNames.flatMap { col => - newSpec.get(col).map(col + "=" + _) - }.mkString("/") + val oldPartition = getPartition(db, table, oldSpec) + val newPartition = if (shouldUpdatePartitionLocation) { + val oldPartPath = new Path(oldPartition.location) + val newPartPath = ExternalCatalogUtils.generatePartitionPath( + newSpec, partitionColumnNames, tablePath) try { - val fs = tableDir.getFileSystem(hadoopConfig) - fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath)) + val fs = tablePath.getFileSystem(hadoopConfig) + fs.rename(oldPartPath, newPartPath) } catch { case e: IOException => - throw new SparkException(s"Unable to rename partition path $oldPath", e) + throw new SparkException(s"Unable to rename partition path $oldPartPath", e) } + oldPartition.copy( + spec = newSpec, + storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString))) + } else { + oldPartition.copy(spec = newSpec) } existingParts.remove(oldSpec) - existingParts.put(newSpec, newPart) + existingParts.put(newSpec, newPartition) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 34748a04859adcf19d38948ecd6f1bc6b1be0172..93c70de18ae7e2a464876ec147ea486291256aa8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -99,6 +99,12 @@ case class CatalogTablePartition( output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")") } + /** Return the partition location, assuming it is specified. */ + def location: String = storage.locationUri.getOrElse { + val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ") + throw new AnalysisException(s"Partition [$specString] did not specify locationUri") + } + /** * Given the partition schema, returns a row with that schema holding the partition values. */ @@ -171,6 +177,11 @@ case class CatalogTable( throw new AnalysisException(s"table $identifier did not specify database") } + /** Return the table location, assuming it is specified. */ + def location: String = storage.locationUri.getOrElse { + throw new AnalysisException(s"table $identifier did not specify locationUri") + } + /** Return the fully qualified name of this table, assuming the database was specified. */ def qualifiedName: String = identifier.unquotedString diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 34bdfc8a98710181e49ebf87dd63a9443a8e9626..303a8662d3f4deab0c5b63f6acbc5b7d26b45e11 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -17,9 +17,8 @@ package org.apache.spark.sql.catalyst.catalog -import java.io.File -import java.net.URI - +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite @@ -320,6 +319,33 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true) } + test("create partitions without location") { + val catalog = newBasicCatalog() + val table = CatalogTable( + identifier = TableIdentifier("tbl", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + schema = new StructType() + .add("col1", "int") + .add("col2", "string") + .add("partCol1", "int") + .add("partCol2", "string"), + provider = Some("hive"), + partitionColumnNames = Seq("partCol1", "partCol2")) + catalog.createTable(table, ignoreIfExists = false) + + val partition = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat) + catalog.createPartitions("db1", "tbl", Seq(partition), ignoreIfExists = false) + + val partitionLocation = catalog.getPartition( + "db1", + "tbl", + Map("partCol1" -> "1", "partCol2" -> "2")).location + val tableLocation = catalog.getTable("db1", "tbl").location + val defaultPartitionLocation = new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2") + assert(new Path(partitionLocation) == defaultPartitionLocation) + } + test("list partitions with partial partition spec") { val catalog = newBasicCatalog() val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1"))) @@ -399,6 +425,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac intercept[AnalysisException] { catalog.getPartition("db2", "tbl2", part2.spec) } } + test("rename partitions should update the location for managed table") { + val catalog = newBasicCatalog() + val table = CatalogTable( + identifier = TableIdentifier("tbl", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + schema = new StructType() + .add("col1", "int") + .add("col2", "string") + .add("partCol1", "int") + .add("partCol2", "string"), + provider = Some("hive"), + partitionColumnNames = Seq("partCol1", "partCol2")) + catalog.createTable(table, ignoreIfExists = false) + + val tableLocation = catalog.getTable("db1", "tbl").location + + val mixedCasePart1 = CatalogTablePartition( + Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat) + val mixedCasePart2 = CatalogTablePartition( + Map("partCol1" -> "3", "partCol2" -> "4"), storageFormat) + + catalog.createPartitions("db1", "tbl", Seq(mixedCasePart1), ignoreIfExists = false) + assert( + new Path(catalog.getPartition("db1", "tbl", mixedCasePart1.spec).location) == + new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2")) + + catalog.renamePartitions("db1", "tbl", Seq(mixedCasePart1.spec), Seq(mixedCasePart2.spec)) + assert( + new Path(catalog.getPartition("db1", "tbl", mixedCasePart2.spec).location) == + new Path(new Path(tableLocation, "partCol1=3"), "partCol2=4")) + + // For external tables, RENAME PARTITION should not update the partition location. + val existingPartLoc = catalog.getPartition("db2", "tbl2", part1.spec).location + catalog.renamePartitions("db2", "tbl2", Seq(part1.spec), Seq(part3.spec)) + assert( + new Path(catalog.getPartition("db2", "tbl2", part3.spec).location) == + new Path(existingPartLoc)) + } + test("rename partitions when database/table does not exist") { val catalog = newBasicCatalog() intercept[AnalysisException] { @@ -419,11 +485,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac test("alter partitions") { val catalog = newBasicCatalog() try { - // Note: Before altering table partitions in Hive, you *must* set the current database - // to the one that contains the table of interest. Otherwise you will end up with the - // most helpful error message ever: "Unable to alter partition. alter is not possible." - // See HIVE-2742 for more detail. - catalog.setCurrentDatabase("db2") val newLocation = newUriForDatabase() val newSerde = "com.sparkbricks.text.EasySerde" val newSerdeProps = Map("spark" -> "bricks", "compressed" -> "false") @@ -571,10 +632,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac // -------------------------------------------------------------------------- private def exists(uri: String, children: String*): Boolean = { - val base = new File(new URI(uri)) - children.foldLeft(base) { - case (parent, child) => new File(parent, child) - }.exists() + val base = new Path(uri) + val finalPath = children.foldLeft(base) { + case (parent, child) => new Path(parent, child) + } + base.getFileSystem(new Configuration()).exists(finalPath) } test("create/drop database should create/delete the directory") { @@ -623,7 +685,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac test("create/drop/rename partitions should create/delete/rename the directory") { val catalog = newBasicCatalog() - val databaseDir = catalog.getDatabase("db1").locationUri val table = CatalogTable( identifier = TableIdentifier("tbl", Some("db1")), tableType = CatalogTableType.MANAGED, @@ -631,34 +692,61 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac schema = new StructType() .add("col1", "int") .add("col2", "string") - .add("a", "int") - .add("b", "string"), + .add("partCol1", "int") + .add("partCol2", "string"), provider = Some("hive"), - partitionColumnNames = Seq("a", "b") - ) + partitionColumnNames = Seq("partCol1", "partCol2")) catalog.createTable(table, ignoreIfExists = false) + val tableLocation = catalog.getTable("db1", "tbl").location + + val part1 = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat) + val part2 = CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), storageFormat) + val part3 = CatalogTablePartition(Map("partCol1" -> "5", "partCol2" -> "6"), storageFormat) + catalog.createPartitions("db1", "tbl", Seq(part1, part2), ignoreIfExists = false) - assert(exists(databaseDir, "tbl", "a=1", "b=2")) - assert(exists(databaseDir, "tbl", "a=3", "b=4")) + assert(exists(tableLocation, "partCol1=1", "partCol2=2")) + assert(exists(tableLocation, "partCol1=3", "partCol2=4")) catalog.renamePartitions("db1", "tbl", Seq(part1.spec), Seq(part3.spec)) - assert(!exists(databaseDir, "tbl", "a=1", "b=2")) - assert(exists(databaseDir, "tbl", "a=5", "b=6")) + assert(!exists(tableLocation, "partCol1=1", "partCol2=2")) + assert(exists(tableLocation, "partCol1=5", "partCol2=6")) catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false, purge = false) - assert(!exists(databaseDir, "tbl", "a=3", "b=4")) - assert(!exists(databaseDir, "tbl", "a=5", "b=6")) + assert(!exists(tableLocation, "partCol1=3", "partCol2=4")) + assert(!exists(tableLocation, "partCol1=5", "partCol2=6")) - val externalPartition = CatalogTablePartition( - Map("a" -> "7", "b" -> "8"), + val tempPath = Utils.createTempDir() + // create partition with existing directory is OK. + val partWithExistingDir = CatalogTablePartition( + Map("partCol1" -> "7", "partCol2" -> "8"), CatalogStorageFormat( - Some(Utils.createTempDir().getAbsolutePath), - None, None, None, false, Map.empty) - ) - catalog.createPartitions("db1", "tbl", Seq(externalPartition), ignoreIfExists = false) - assert(!exists(databaseDir, "tbl", "a=7", "b=8")) + Some(tempPath.getAbsolutePath), + None, None, None, false, Map.empty)) + catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false) + + tempPath.delete() + // create partition with non-existing directory will create that directory. + val partWithNonExistingDir = CatalogTablePartition( + Map("partCol1" -> "9", "partCol2" -> "10"), + CatalogStorageFormat( + Some(tempPath.getAbsolutePath), + None, None, None, false, Map.empty)) + catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false) + assert(tempPath.exists()) + } + + test("drop partition from external table should not delete the directory") { + val catalog = newBasicCatalog() + catalog.createPartitions("db2", "tbl1", Seq(part1), ignoreIfExists = false) + + val partPath = new Path(catalog.getPartition("db2", "tbl1", part1.spec).location) + val fs = partPath.getFileSystem(new Configuration) + assert(fs.exists(partPath)) + + catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false) + assert(fs.exists(partPath)) } } @@ -731,7 +819,7 @@ abstract class CatalogTestUtils { CatalogTable( identifier = TableIdentifier(name, database), tableType = CatalogTableType.EXTERNAL, - storage = storageFormat, + storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)), schema = new StructType() .add("col1", "int") .add("col2", "string") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 001d9c47785d236efa3f395c8d5ecfff994984d3..52385de50db6bd9578021b8589c0f322419214d3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -527,13 +527,13 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false) sessionCatalog.createPartitions( TableIdentifier("tbl", Some("mydb")), Seq(part1, part2), ignoreIfExists = false) - assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2))) + assert(catalogPartitionsEqual(externalCatalog.listPartitions("mydb", "tbl"), part1, part2)) // Create partitions without explicitly specifying database sessionCatalog.setCurrentDatabase("mydb") sessionCatalog.createPartitions( TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false) assert(catalogPartitionsEqual( - externalCatalog, "mydb", "tbl", Seq(part1, part2, partWithMixedOrder))) + externalCatalog.listPartitions("mydb", "tbl"), part1, part2, partWithMixedOrder)) } test("create partitions when database/table does not exist") { @@ -586,13 +586,13 @@ class SessionCatalogSuite extends SparkFunSuite { test("drop partitions") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2))) + assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part1, part2)) sessionCatalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), ignoreIfNotExists = false, purge = false) - assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2))) + assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2)) // Drop partitions without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") sessionCatalog.dropPartitions( @@ -604,7 +604,7 @@ class SessionCatalogSuite extends SparkFunSuite { // Drop multiple partitions at once sessionCatalog.createPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false) - assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2))) + assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part1, part2)) sessionCatalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), @@ -844,10 +844,11 @@ class SessionCatalogSuite extends SparkFunSuite { test("list partitions") { val catalog = new SessionCatalog(newBasicCatalog()) - assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1, part2)) + assert(catalogPartitionsEqual( + catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))), part1, part2)) // List partitions without explicitly specifying database catalog.setCurrentDatabase("db2") - assert(catalog.listPartitions(TableIdentifier("tbl2")).toSet == Set(part1, part2)) + assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2)) } test("list partitions when database/table does not exist") { @@ -860,6 +861,15 @@ class SessionCatalogSuite extends SparkFunSuite { } } + private def catalogPartitionsEqual( + actualParts: Seq[CatalogTablePartition], + expectedParts: CatalogTablePartition*): Boolean = { + // ExternalCatalog may set a default location for partitions, here we ignore the partition + // location when comparing them. + actualParts.map(p => p.copy(storage = p.storage.copy(locationUri = None))).toSet == + expectedParts.map(p => p.copy(storage = p.storage.copy(locationUri = None))).toSet + } + // -------------------------------------------------------------------------- // Functions // -------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 8500ab460a1b67d35514d51dec85b339c67dbbbf..84a63fdb9f36fb1019f5568dadc9655de02839f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils} @@ -500,7 +500,7 @@ case class AlterTableRecoverPartitionsCommand( s"location provided: $tableIdentWithDB") } - val root = new Path(table.storage.locationUri.get) + val root = new Path(table.location) logInfo(s"Recover all the partitions in $root") val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) @@ -558,9 +558,9 @@ case class AlterTableRecoverPartitionsCommand( val name = st.getPath.getName if (st.isDirectory && name.contains("=")) { val ps = name.split("=", 2) - val columnName = PartitioningUtils.unescapePathName(ps(0)) + val columnName = ExternalCatalogUtils.unescapePathName(ps(0)) // TODO: Validate the value - val value = PartitioningUtils.unescapePathName(ps(1)) + val value = ExternalCatalogUtils.unescapePathName(ps(1)) if (resolver(columnName, partitionNames.head)) { scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), partitionNames.drop(1), threshold, resolver) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index e49a1f5acd0c9b7cd168253dfa2c72c367313a89..119e732d0202cb526a75483532a1c3cbd9107674 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -710,7 +710,8 @@ case class ShowPartitionsCommand( private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = { partColNames.map { name => - PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name)) + ExternalCatalogUtils.escapePathName(name) + "=" + + ExternalCatalogUtils.escapePathName(spec(name)) }.mkString(File.separator) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 443a2ec033a985bf913eacdcaa8fa5a53279572c..4ad91dcceb432975f0bd7bd133fc87090c548c85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -67,7 +67,7 @@ class CatalogFileIndex( val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( table.identifier, filters) val partitions = selectedPartitions.map { p => - val path = new Path(p.storage.locationUri.get) + val path = new Path(p.location) val fs = path.getFileSystem(hadoopConf) PartitionPath( p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 2d43a6ad098ed2ccb4b80ac0a71ae686648bb27f..739aeac877b99f9c0fda4c261bf776256281ebc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -190,7 +190,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { val effectiveOutputPath = if (overwritingSinglePartition) { val partition = t.sparkSession.sessionState.catalog.getPartition( l.catalogTable.get.identifier, overwrite.specificPartition.get) - new Path(partition.storage.locationUri.get) + new Path(partition.location) } else { outputPath } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index fa7fe143daeba5b2ab1dba28b7247dd6f1022739..69b3fa667ef54c65ae4674360dd0d5bef1775259 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.{Dataset, SparkSession} -import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning @@ -281,11 +281,11 @@ object FileFormatWriter extends Logging { private def partitionStringExpression: Seq[Expression] = { description.partitionColumns.zipWithIndex.flatMap { case (c, i) => val escaped = ScalaUDF( - PartitioningUtils.escapePathName _, + ExternalCatalogUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType)) - val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped) + val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped) val partitionName = Literal(c.name + "=") :: str :: Nil if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index a8a722dd3c6200fd4cbf4c6e0e2f59d004b7fd5b..3740caa22c37e041daccd203b1765b76d6ae6cd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -128,7 +128,6 @@ abstract class PartitioningAwareFileIndex( case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => val spec = PartitioningUtils.parsePartitions( leafDirs, - PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = false, basePaths = basePaths) @@ -148,7 +147,6 @@ abstract class PartitioningAwareFileIndex( case _ => PartitioningUtils.parsePartitions( leafDirs, - PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, basePaths = basePaths) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index b51b41869bf06241480c448a961561bf80dd94c9..a28b04ca3fb5af9493f09f516d0d2aa79d50d5db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Try import org.apache.hadoop.fs.Path -import org.apache.hadoop.util.Shell import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow @@ -56,15 +55,15 @@ object PartitionSpec { } object PartitioningUtils { - // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't - // depend on Hive. - val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" private[datasources] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) { require(columnNames.size == literals.size) } + import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME + import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName + /** * Given a group of qualified paths, tries to parse them and returns a partition specification. * For example, given: @@ -90,12 +89,11 @@ object PartitioningUtils { */ private[datasources] def parsePartitions( paths: Seq[Path], - defaultPartitionName: String, typeInference: Boolean, basePaths: Set[Path]): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optDiscoveredBasePaths) = paths.map { path => - parsePartition(path, defaultPartitionName, typeInference, basePaths) + parsePartition(path, typeInference, basePaths) }.unzip // We create pairs of (path -> path's partition value) here @@ -173,7 +171,6 @@ object PartitioningUtils { */ private[datasources] def parsePartition( path: Path, - defaultPartitionName: String, typeInference: Boolean, basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] @@ -196,7 +193,7 @@ object PartitioningUtils { // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1. // Once we get the string, we try to parse it and find the partition column and value. val maybeColumn = - parsePartitionColumn(currentPath.getName, defaultPartitionName, typeInference) + parsePartitionColumn(currentPath.getName, typeInference) maybeColumn.foreach(columns += _) // Now, we determine if we should stop. @@ -228,7 +225,6 @@ object PartitioningUtils { private def parsePartitionColumn( columnSpec: String, - defaultPartitionName: String, typeInference: Boolean): Option[(String, Literal)] = { val equalSignIndex = columnSpec.indexOf('=') if (equalSignIndex == -1) { @@ -240,7 +236,7 @@ object PartitioningUtils { val rawColumnValue = columnSpec.drop(equalSignIndex + 1) assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") - val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference) + val literal = inferPartitionColumnValue(rawColumnValue, typeInference) Some(columnName -> literal) } } @@ -355,7 +351,6 @@ object PartitioningUtils { */ private[datasources] def inferPartitionColumnValue( raw: String, - defaultPartitionName: String, typeInference: Boolean): Literal = { val decimalTry = Try { // `BigDecimal` conversion can fail when the `field` is not a form of number. @@ -380,14 +375,14 @@ object PartitioningUtils { .orElse(Try(Literal(JTimestamp.valueOf(unescapePathName(raw))))) // Then falls back to string .getOrElse { - if (raw == defaultPartitionName) { + if (raw == DEFAULT_PARTITION_NAME) { Literal.create(null, NullType) } else { Literal.create(unescapePathName(raw), StringType) } } } else { - if (raw == defaultPartitionName) { + if (raw == DEFAULT_PARTITION_NAME) { Literal.create(null, NullType) } else { Literal.create(unescapePathName(raw), StringType) @@ -450,77 +445,4 @@ object PartitioningUtils { Literal.create(Cast(l, desiredType).eval(), desiredType) } } - - ////////////////////////////////////////////////////////////////////////////////////////////////// - // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils). - ////////////////////////////////////////////////////////////////////////////////////////////////// - - val charToEscape = { - val bitSet = new java.util.BitSet(128) - - /** - * ASCII 01-1F are HTTP control characters that need to be escaped. - * \u000A and \u000D are \n and \r, respectively. - */ - val clist = Array( - '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', - '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013', - '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C', - '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', - '{', '[', ']', '^') - - clist.foreach(bitSet.set(_)) - - if (Shell.WINDOWS) { - Array(' ', '<', '>', '|').foreach(bitSet.set(_)) - } - - bitSet - } - - def needsEscaping(c: Char): Boolean = { - c >= 0 && c < charToEscape.size() && charToEscape.get(c) - } - - def escapePathName(path: String): String = { - val builder = new StringBuilder() - path.foreach { c => - if (needsEscaping(c)) { - builder.append('%') - builder.append(f"${c.asInstanceOf[Int]}%02X") - } else { - builder.append(c) - } - } - - builder.toString() - } - - def unescapePathName(path: String): String = { - val sb = new StringBuilder - var i = 0 - - while (i < path.length) { - val c = path.charAt(i) - if (c == '%' && i + 2 < path.length) { - val code: Int = try { - Integer.parseInt(path.substring(i + 1, i + 3), 16) - } catch { - case _: Exception => -1 - } - if (code >= 0) { - sb.append(code.asInstanceOf[Char]) - i += 3 - } else { - sb.append(c) - i += 1 - } - } else { - sb.append(c) - i += 1 - } - } - - sb.toString() - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index df3a3c34c39a07a156665a869577943b32f16aa0..363715c6d22499f2422aeec802bc8a9d0d3abba1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -875,7 +875,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) val part2 = Map("a" -> "2", "b" -> "6") - val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get) + val root = new Path(catalog.getTableMetadata(tableIdent).location) val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) // valid fs.mkdirs(new Path(new Path(root, "a=1"), "b=5")) @@ -1133,7 +1133,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined) assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty) - assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty) + assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined) assert(catalog.getPartition(tableIdent, partSpec).storage.properties.isEmpty) // Verify that the location is set to the expected string def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = { @@ -1296,9 +1296,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " + "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')") assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3)) - assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty) + assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined) assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris")) - assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty) + assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined) // add partitions without explicitly specifying database catalog.setCurrentDatabase("dbx") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 120a3a2ef33aaa3cbd25f9dbc4aec21caa701eac..22e35a1bc0b1d7d49447c67eb1d3adf3c9303c16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -29,6 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} @@ -48,11 +49,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha import PartitioningUtils._ import testImplicits._ - val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__" + val defaultPartitionName = ExternalCatalogUtils.DEFAULT_PARTITION_NAME test("column type inference") { def check(raw: String, literal: Literal): Unit = { - assert(inferPartitionColumnValue(raw, defaultPartitionName, true) === literal) + assert(inferPartitionColumnValue(raw, true) === literal) } check("10", Literal.create(10, IntegerType)) @@ -76,7 +77,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha "hdfs://host:9000/path/a=10.5/b=hello") var exception = intercept[AssertionError] { - parsePartitions(paths.map(new Path(_)), defaultPartitionName, true, Set.empty[Path]) + parsePartitions(paths.map(new Path(_)), true, Set.empty[Path]) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -88,7 +89,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions( paths.map(new Path(_)), - defaultPartitionName, true, Set(new Path("hdfs://host:9000/path/"))) @@ -101,7 +101,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions( paths.map(new Path(_)), - defaultPartitionName, true, Set(new Path("hdfs://host:9000/path/something=true/table"))) @@ -114,7 +113,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions( paths.map(new Path(_)), - defaultPartitionName, true, Set(new Path("hdfs://host:9000/path/table=true"))) @@ -127,7 +125,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha exception = intercept[AssertionError] { parsePartitions( paths.map(new Path(_)), - defaultPartitionName, true, Set(new Path("hdfs://host:9000/path/"))) } @@ -147,7 +144,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha exception = intercept[AssertionError] { parsePartitions( paths.map(new Path(_)), - defaultPartitionName, true, Set(new Path("hdfs://host:9000/tmp/tables/"))) } @@ -156,13 +152,13 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("parse partition") { def check(path: String, expected: Option[PartitionValues]): Unit = { - val actual = parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])._1 + val actual = parsePartition(new Path(path), true, Set.empty[Path])._1 assert(expected === actual) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path]) + parsePartition(new Path(path), true, Set.empty[Path]) }.getMessage assert(message.contains(expected)) @@ -204,7 +200,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha // when the basePaths is the same as the path to a leaf directory val partitionSpec1: Option[PartitionValues] = parsePartition( path = new Path("file://path/a=10"), - defaultPartitionName = defaultPartitionName, typeInference = true, basePaths = Set(new Path("file://path/a=10")))._1 @@ -213,7 +208,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha // when the basePaths is the path to a base directory of leaf directories val partitionSpec2: Option[PartitionValues] = parsePartition( path = new Path("file://path/a=10"), - defaultPartitionName = defaultPartitionName, typeInference = true, basePaths = Set(new Path("file://path")))._1 @@ -231,7 +225,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val actualSpec = parsePartitions( paths.map(new Path(_)), - defaultPartitionName, true, rootPaths) assert(actualSpec === spec) @@ -314,7 +307,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("parse partitions with type inference disabled") { def check(paths: Seq[String], spec: PartitionSpec): Unit = { val actualSpec = - parsePartitions(paths.map(new Path(_)), defaultPartitionName, false, Set.empty[Path]) + parsePartitions(paths.map(new Path(_)), false, Set.empty[Path]) assert(actualSpec === spec) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index b537061d0d22155ac08f8140d1bb1745fe102ac9..42ce1a88a2b67fdce229b1411fadc07546407255 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive +import java.io.IOException import java.util import scala.util.control.NonFatal @@ -26,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.thrift.TException -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier @@ -255,7 +256,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // compatible format, which means the data source is file-based and must have a `path`. require(tableDefinition.storage.locationUri.isDefined, "External file-based data source table must have a `path` entry in storage properties.") - Some(new Path(tableDefinition.storage.locationUri.get).toUri.toString) + Some(new Path(tableDefinition.location).toUri.toString) } else { None } @@ -789,7 +790,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = withClient { requireTableExists(db, table) - val lowerCasedParts = parts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) + + val tableMeta = getTable(db, table) + val partitionColumnNames = tableMeta.partitionColumnNames + val tablePath = new Path(tableMeta.location) + val partsWithLocation = parts.map { p => + // Ideally we can leave the partition location empty and let Hive metastore to set it. + // However, Hive metastore is not case preserving and will generate wrong partition location + // with lower cased partition column names. Here we set the default partition location + // manually to avoid this problem. + val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse { + ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath) + } + p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))) + } + val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) client.createPartitions(db, table, lowerCasedParts, ignoreIfExists) } @@ -810,6 +825,31 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat newSpecs: Seq[TablePartitionSpec]): Unit = withClient { client.renamePartitions( db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec)) + + val tableMeta = getTable(db, table) + val partitionColumnNames = tableMeta.partitionColumnNames + // Hive metastore is not case preserving and keeps partition columns with lower cased names. + // When Hive rename partition for managed tables, it will create the partition location with + // a default path generate by the new spec with lower cased partition column names. This is + // unexpected and we need to rename them manually and alter the partition location. + val hasUpperCasePartitionColumn = partitionColumnNames.exists(col => col.toLowerCase != col) + if (tableMeta.tableType == MANAGED && hasUpperCasePartitionColumn) { + val tablePath = new Path(tableMeta.location) + val newParts = newSpecs.map { spec => + val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec)) + val wrongPath = new Path(partition.location) + val rightPath = ExternalCatalogUtils.generatePartitionPath( + spec, partitionColumnNames, tablePath) + try { + tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath) + } catch { + case e: IOException => throw new SparkException( + s"Unable to rename partition path from $wrongPath to $rightPath", e) + } + partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString))) + } + alterPartitions(db, table, newParts) + } } override def alterPartitions( @@ -817,6 +857,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, newParts: Seq[CatalogTablePartition]): Unit = withClient { val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) + // Note: Before altering table partitions in Hive, you *must* set the current database + // to the one that contains the table of interest. Otherwise you will end up with the + // most helpful error message ever: "Unable to alter partition. alter is not possible." + // See HIVE-2742 for more detail. + client.setCurrentDatabase(db) client.alterPartitions(db, table, lowerCasedParts) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index d3873cf6c82313252d81ac61177869c94947baba..fbd705172cae6468a95bde785849f74dd1cf55d7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -445,7 +445,7 @@ object SetWarehouseLocationTest extends Logging { catalog.getTableMetadata(TableIdentifier("testLocation", Some("default"))) val expectedLocation = "file:" + expectedWarehouseLocation.toString + "/testlocation" - val actualLocation = tableMetadata.storage.locationUri.get + val actualLocation = tableMetadata.location if (actualLocation != expectedLocation) { throw new Exception( s"Expected table location is $expectedLocation. But, it is actually $actualLocation") @@ -461,7 +461,7 @@ object SetWarehouseLocationTest extends Logging { catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB"))) val expectedLocation = "file:" + expectedWarehouseLocation.toString + "/testlocationdb.db/testlocation" - val actualLocation = tableMetadata.storage.locationUri.get + val actualLocation = tableMetadata.location if (actualLocation != expectedLocation) { throw new Exception( s"Expected table location is $expectedLocation. But, it is actually $actualLocation") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index cfc1d81d544eb28b4223a5a0ece8622b3c1e1714..9f4401ae225606fe72ef847e91f61704b58dc6cc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -29,7 +29,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val expectedPath = spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName - assert(metastoreTable.storage.locationUri.get === expectedPath) + assert(metastoreTable.location === expectedPath) } private def getTableNames(dbName: Option[String] = None): Array[String] = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0076a778683cad3b79e37ac5600cbd4139b98aab..6efae13ddf69dd922cd5bdf1098dda561173b465 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -425,7 +425,7 @@ class HiveDDLSuite sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)") val part1 = Map("a" -> "1", "b" -> "5") val part2 = Map("a" -> "2", "b" -> "6") - val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get) + val root = new Path(catalog.getTableMetadata(tableIdent).location) val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) // valid fs.mkdirs(new Path(new Path(root, "a=1"), "b=5")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index c21db3595fa19b268e1519da458bf6717f5753fb..e607af67f93e59a29243f77edcdf6a1e37b06389 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -542,7 +542,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } userSpecifiedLocation match { case Some(location) => - assert(r.catalogTable.storage.locationUri.get === location) + assert(r.catalogTable.location === location) case None => // OK. } // Also make sure that the format and serde are as desired.