diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 50ae9667f4c8f89835949a79665ed7a5d7d35559..1dd8818dedb2ef64988d99dc3d63fdcb6c54eaba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -281,7 +281,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @since 1.4.0 */ def save(): Unit = { - assertNotBucketed() + assertNotBucketed("save") assertNotStreaming("save() can only be called on non-continuous queries") val dataSource = DataSource( df.sparkSession, @@ -330,7 +330,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { */ @Experimental def startStream(): ContinuousQuery = { - assertNotBucketed() + assertNotBucketed("startStream") assertStreaming("startStream() can only be called on continuous queries") if (source == "memory") { @@ -430,7 +430,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { } private def insertInto(tableIdent: TableIdentifier): Unit = { - assertNotBucketed() + assertNotBucketed("insertInto") assertNotStreaming("insertInto() can only be called on non-continuous queries") val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap) val overwrite = mode == SaveMode.Overwrite @@ -500,10 +500,10 @@ final class DataFrameWriter private[sql](df: DataFrame) { s"existing columns (${validColumnNames.mkString(", ")})")) } - private def assertNotBucketed(): Unit = { + private def assertNotBucketed(operation: String): Unit = { if (numBuckets.isDefined || sortColumnNames.isDefined) { throw new IllegalArgumentException( - "Currently we don't support writing bucketed data to this data source.") + s"'$operation' does not support bucketing right now.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index a2aac69064f9dc5c22ca3574aa0ea1ee32bd092c..431a943304f5bad405d6fe6b726f5cf56d5b1464 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -456,7 +456,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { .stream() val w = df.write val e = intercept[IllegalArgumentException](w.bucketBy(1, "text").startStream()) - assert(e.getMessage == "Currently we don't support writing bucketed data to this data source.") + assert(e.getMessage == "'startStream' does not support bucketing right now.") } test("check sortBy() can only be called on non-continuous queries;") { @@ -465,7 +465,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { .stream() val w = df.write val e = intercept[IllegalArgumentException](w.sortBy("text").startStream()) - assert(e.getMessage == "Currently we don't support writing bucketed data to this data source.") + assert(e.getMessage == "'startStream' does not support bucketing right now.") } test("check save(path) can only be called on non-continuous queries") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index ff44c6f29497d2eb60b64319a25208e828c6b9ee..61a281db852386965fa95c754ffb00daa5f84f6c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -59,11 +59,22 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle intercept[SparkException](df.write.bucketBy(3, "i").format("text").saveAsTable("tt")) } - test("write bucketed data to non-hive-table or existing hive table") { + test("write bucketed data using save()") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") - intercept[IllegalArgumentException](df.write.bucketBy(2, "i").parquet("/tmp/path")) - intercept[IllegalArgumentException](df.write.bucketBy(2, "i").json("/tmp/path")) - intercept[IllegalArgumentException](df.write.bucketBy(2, "i").insertInto("tt")) + + val e = intercept[IllegalArgumentException] { + df.write.bucketBy(2, "i").parquet("/tmp/path") + } + assert(e.getMessage == "'save' does not support bucketing right now.") + } + + test("write bucketed data using insertInto()") { + val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") + + val e = intercept[IllegalArgumentException] { + df.write.bucketBy(2, "i").insertInto("tt") + } + assert(e.getMessage == "'insertInto' does not support bucketing right now.") } private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")