Skip to content
Snippets Groups Projects
Commit 008a5377 authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-15538][SPARK-15539][SQL] Truncate table fixes round 2

## What changes were proposed in this pull request?

Two more changes:
(1) Fix truncate table for data source tables (only for cases without `PARTITION`)
(2) Disallow truncating external tables or views

## How was this patch tested?

`DDLSuite`

Author: Andrew Or <andrew@databricks.com>

Closes #13315 from andrewor14/truncate-table.
parent 3ac2363d
No related branches found
No related tags found
No related merge requests found
......@@ -285,41 +285,67 @@ case class TruncateTableCommand(
tableName: TableIdentifier,
partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
if (!catalog.tableExists(tableName)) {
throw new AnalysisException(s"Table '$tableName' in TRUNCATE TABLE does not exist.")
} else if (catalog.isTemporaryTable(tableName)) {
}
if (catalog.isTemporaryTable(tableName)) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on temporary tables: '$tableName'")
} else {
val locations = if (partitionSpec.isDefined) {
catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
}
val table = catalog.getTableMetadata(tableName)
if (table.tableType == CatalogTableType.EXTERNAL) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on external tables: '$tableName'")
}
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on views: '$tableName'")
}
val isDatasourceTable = DDLUtils.isDatasourceTable(table)
if (isDatasourceTable && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
s"for tables created using the data sources API: '$tableName'")
}
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
s"for tables that are not partitioned: '$tableName'")
}
val locations =
if (isDatasourceTable || table.partitionColumnNames.isEmpty) {
Seq(table.storage.locationUri)
} else {
val table = catalog.getTableMetadata(tableName)
if (table.partitionColumnNames.nonEmpty) {
catalog.listPartitions(tableName).map(_.storage.locationUri)
} else {
Seq(table.storage.locationUri)
}
catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
}
val hadoopConf = sparkSession.sessionState.newHadoopConf()
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
try {
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
fs.mkdirs(path)
} catch {
case NonFatal(e) =>
throw new AnalysisException(
s"Failed to truncate table '$tableName' when removing data of the path: $path " +
s"because of ${e.toString}")
}
val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
try {
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
fs.mkdirs(path)
} catch {
case NonFatal(e) =>
throw new AnalysisException(
s"Failed to truncate table '$tableName' when removing data of the path: $path " +
s"because of ${e.toString}")
}
}
}
// After deleting the data, invalidate the table to make sure we don't keep around a stale
// file relation in the metastore cache.
spark.sessionState.invalidateTable(tableName.unquotedString)
// Also try to drop the contents of the table from the columnar cache
try {
spark.sharedState.cacheManager.tryUncacheQuery(spark.table(tableName.quotedString))
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table '$tableName'", e)
}
Seq.empty[Row]
}
}
......
......@@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.util.Utils
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private val escapedIdentifier = "`(.+)`".r
......@@ -1109,4 +1110,37 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}
test("truncate table - datasource table") {
import testImplicits._
val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
data.write.saveAsTable("rectangles")
spark.catalog.cacheTable("rectangles")
assume(spark.table("rectangles").collect().nonEmpty, "bad test; table was empty to begin with")
assume(spark.catalog.isCached("rectangles"), "bad test; table was not cached to begin with")
sql("TRUNCATE TABLE rectangles")
assert(spark.table("rectangles").collect().isEmpty)
assert(!spark.catalog.isCached("rectangles"))
// truncating partitioned data source tables is not supported
data.write.partitionBy("length").saveAsTable("rectangles2")
assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
assertUnsupported("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
}
test("truncate table - external table, temporary table, view (not allowed)") {
import testImplicits._
val path = Utils.createTempDir().getAbsolutePath
(1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab")
sql(s"CREATE EXTERNAL TABLE my_ext_tab LOCATION '$path'")
sql(s"CREATE VIEW my_view AS SELECT 1")
assertUnsupported("TRUNCATE TABLE my_temp_tab")
assertUnsupported("TRUNCATE TABLE my_ext_tab")
assertUnsupported("TRUNCATE TABLE my_view")
}
test("truncate table - non-partitioned table (not allowed)") {
sql("CREATE TABLE my_tab (age INT, name STRING)")
assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment