From 695647bf2ebda56f9effb7fcdd875490132ea012 Mon Sep 17 00:00:00 2001 From: hyukjinkwon <gurwls223@gmail.com> Date: Thu, 9 Nov 2017 15:00:31 +0900 Subject: [PATCH] [SPARK-21640][SQL][PYTHON][R][FOLLOWUP] Add errorifexists in SparkR and other documentations ## What changes were proposed in this pull request? This PR proposes to add `errorifexists` to SparkR API and fix the rest of them describing the mode, mainly, in API documentations as well. This PR also replaces `convertToJSaveMode` to `setWriteMode` so that string as is is passed to JVM and executes: https://github.com/apache/spark/blob/b034f2565f72aa73c9f0be1e49d148bb4cf05153/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L72-L82 and remove the duplication here: https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L187-L194 ## How was this patch tested? Manually checked the built documentation. These were mainly found by `` grep -r `error` `` and `grep -r 'error'`. Also, unit tests added in `test_sparkSQL.R`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #19673 from HyukjinKwon/SPARK-21640-followup. --- R/pkg/R/DataFrame.R | 79 +++++++++++-------- R/pkg/R/utils.R | 9 --- R/pkg/tests/fulltests/test_sparkSQL.R | 8 ++ R/pkg/tests/fulltests/test_utils.R | 8 -- python/pyspark/sql/readwriter.py | 25 +++--- .../apache/spark/sql/DataFrameWriter.scala | 2 +- .../org/apache/spark/sql/api/r/SQLUtils.scala | 9 --- 7 files changed, 71 insertions(+), 69 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 763c8d2548..b8d732a485 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -58,14 +58,23 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) { #' Set options/mode and then return the write object #' @noRd setWriteOptions <- function(write, path = NULL, mode = "error", ...) { - options <- varargsToStrEnv(...) - if (!is.null(path)) { - options[["path"]] <- path - } - jmode <- convertToJSaveMode(mode) - write <- callJMethod(write, "mode", jmode) - write <- callJMethod(write, "options", options) - write + options <- varargsToStrEnv(...) + if (!is.null(path)) { + options[["path"]] <- path + } + write <- setWriteMode(write, mode) + write <- callJMethod(write, "options", options) + write +} + +#' Set mode and then return the write object +#' @noRd +setWriteMode <- function(write, mode) { + if (!is.character(mode)) { + stop("mode should be character or omitted. It is 'error' by default.") + } + write <- handledCallJMethod(write, "mode", mode) + write } #' @export @@ -556,9 +565,8 @@ setMethod("registerTempTable", setMethod("insertInto", signature(x = "SparkDataFrame", tableName = "character"), function(x, tableName, overwrite = FALSE) { - jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append")) write <- callJMethod(x@sdf, "write") - write <- callJMethod(write, "mode", jmode) + write <- setWriteMode(write, ifelse(overwrite, "overwrite", "append")) invisible(callJMethod(write, "insertInto", tableName)) }) @@ -810,7 +818,8 @@ setMethod("toJSON", #' #' @param x A SparkDataFrame #' @param path The directory where the file is saved -#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) +#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore' +#' save mode (it is 'error' by default) #' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions @@ -841,7 +850,8 @@ setMethod("write.json", #' #' @param x A SparkDataFrame #' @param path The directory where the file is saved -#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) +#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore' +#' save mode (it is 'error' by default) #' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions @@ -872,7 +882,8 @@ setMethod("write.orc", #' #' @param x A SparkDataFrame #' @param path The directory where the file is saved -#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) +#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore' +#' save mode (it is 'error' by default) #' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions @@ -917,7 +928,8 @@ setMethod("saveAsParquetFile", #' #' @param x A SparkDataFrame #' @param path The directory where the file is saved -#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) +#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore' +#' save mode (it is 'error' by default) #' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions @@ -2871,18 +2883,19 @@ setMethod("except", #' Additionally, mode is used to specify the behavior of the save operation when data already #' exists in the data source. There are four modes: #' \itemize{ -#' \item append: Contents of this SparkDataFrame are expected to be appended to existing data. -#' \item overwrite: Existing data is expected to be overwritten by the contents of this +#' \item 'append': Contents of this SparkDataFrame are expected to be appended to existing data. +#' \item 'overwrite': Existing data is expected to be overwritten by the contents of this #' SparkDataFrame. -#' \item error: An exception is expected to be thrown. -#' \item ignore: The save operation is expected to not save the contents of the SparkDataFrame +#' \item 'error' or 'errorifexists': An exception is expected to be thrown. +#' \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame #' and to not change the existing data. #' } #' #' @param df a SparkDataFrame. #' @param path a name for the table. #' @param source a name for external data source. -#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) +#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore' +#' save mode (it is 'error' by default) #' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions @@ -2940,17 +2953,18 @@ setMethod("saveDF", #' #' Additionally, mode is used to specify the behavior of the save operation when #' data already exists in the data source. There are four modes: \cr -#' append: Contents of this SparkDataFrame are expected to be appended to existing data. \cr -#' overwrite: Existing data is expected to be overwritten by the contents of this +#' 'append': Contents of this SparkDataFrame are expected to be appended to existing data. \cr +#' 'overwrite': Existing data is expected to be overwritten by the contents of this #' SparkDataFrame. \cr -#' error: An exception is expected to be thrown. \cr -#' ignore: The save operation is expected to not save the contents of the SparkDataFrame +#' 'error' or 'errorifexists': An exception is expected to be thrown. \cr +#' 'ignore': The save operation is expected to not save the contents of the SparkDataFrame #' and to not change the existing data. \cr #' #' @param df a SparkDataFrame. #' @param tableName a name for the table. #' @param source a name for external data source. -#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default). +#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore' +#' save mode (it is 'error' by default) #' @param ... additional option(s) passed to the method. #' #' @family SparkDataFrame functions @@ -2972,12 +2986,11 @@ setMethod("saveAsTable", if (is.null(source)) { source <- getDefaultSqlSource() } - jmode <- convertToJSaveMode(mode) options <- varargsToStrEnv(...) write <- callJMethod(df@sdf, "write") write <- callJMethod(write, "format", source) - write <- callJMethod(write, "mode", jmode) + write <- setWriteMode(write, mode) write <- callJMethod(write, "options", options) invisible(callJMethod(write, "saveAsTable", tableName)) }) @@ -3544,18 +3557,19 @@ setMethod("histogram", #' Also, mode is used to specify the behavior of the save operation when #' data already exists in the data source. There are four modes: #' \itemize{ -#' \item append: Contents of this SparkDataFrame are expected to be appended to existing data. -#' \item overwrite: Existing data is expected to be overwritten by the contents of this +#' \item 'append': Contents of this SparkDataFrame are expected to be appended to existing data. +#' \item 'overwrite': Existing data is expected to be overwritten by the contents of this #' SparkDataFrame. -#' \item error: An exception is expected to be thrown. -#' \item ignore: The save operation is expected to not save the contents of the SparkDataFrame +#' \item 'error' or 'errorifexists': An exception is expected to be thrown. +#' \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame #' and to not change the existing data. #' } #' #' @param x a SparkDataFrame. #' @param url JDBC database url of the form \code{jdbc:subprotocol:subname}. #' @param tableName yhe name of the table in the external database. -#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default). +#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore' +#' save mode (it is 'error' by default) #' @param ... additional JDBC database connection properties. #' @family SparkDataFrame functions #' @rdname write.jdbc @@ -3572,10 +3586,9 @@ setMethod("histogram", setMethod("write.jdbc", signature(x = "SparkDataFrame", url = "character", tableName = "character"), function(x, url, tableName, mode = "error", ...) { - jmode <- convertToJSaveMode(mode) jprops <- varargsToJProperties(...) write <- callJMethod(x@sdf, "write") - write <- callJMethod(write, "mode", jmode) + write <- setWriteMode(write, mode) invisible(handledCallJMethod(write, "jdbc", url, tableName, jprops)) }) diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 4b716995f2..fa4099231c 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -736,15 +736,6 @@ splitString <- function(input) { Filter(nzchar, unlist(strsplit(input, ",|\\s"))) } -convertToJSaveMode <- function(mode) { - allModes <- c("append", "overwrite", "error", "ignore") - if (!(mode %in% allModes)) { - stop('mode should be one of "append", "overwrite", "error", "ignore"') # nolint - } - jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode) - jmode -} - varargsToJProperties <- function(...) { pairs <- list(...) props <- newJObject("java.util.Properties") diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 0c8118a7c7..a0dbd475f7 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -630,6 +630,10 @@ test_that("read/write json files", { jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".json") write.df(df, jsonPath2, "json", mode = "overwrite") + # Test errorifexists + expect_error(write.df(df, jsonPath2, "json", mode = "errorifexists"), + "analysis error - path file:.*already exists") + # Test write.json jsonPath3 <- tempfile(pattern = "jsonPath3", fileext = ".json") write.json(df, jsonPath3) @@ -1371,6 +1375,9 @@ test_that("test HiveContext", { expect_equal(count(df5), 3) unlink(parquetDataPath) + # Invalid mode + expect_error(saveAsTable(df, "parquetest", "parquet", mode = "abc", path = parquetDataPath), + "illegal argument - Unknown save mode: abc") unsetHiveContext() } }) @@ -3303,6 +3310,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume "Error in orc : analysis error - path file:.*already exists") expect_error(write.parquet(df, jsonPath), "Error in parquet : analysis error - path file:.*already exists") + expect_error(write.parquet(df, jsonPath, mode = 123), "mode should be character or omitted.") # Arguments checking in R side. expect_error(write.df(df, "data.tmp", source = c(1, 2)), diff --git a/R/pkg/tests/fulltests/test_utils.R b/R/pkg/tests/fulltests/test_utils.R index af81423aa8..fb394b8069 100644 --- a/R/pkg/tests/fulltests/test_utils.R +++ b/R/pkg/tests/fulltests/test_utils.R @@ -158,14 +158,6 @@ test_that("varargsToJProperties", { expect_equal(callJMethod(jprops, "size"), 0L) }) -test_that("convertToJSaveMode", { - s <- convertToJSaveMode("error") - expect_true(class(s) == "jobj") - expect_match(capture.output(print.jobj(s)), "Java ref type org.apache.spark.sql.SaveMode id ") - expect_error(convertToJSaveMode("foo"), - 'mode should be one of "append", "overwrite", "error", "ignore"') #nolint -}) - test_that("captureJVMException", { method <- "createStructField" expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", method, diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 3d87567ab6..a75bdf8078 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -540,7 +540,7 @@ class DataFrameWriter(OptionUtils): * `append`: Append contents of this :class:`DataFrame` to existing data. * `overwrite`: Overwrite existing data. - * `error`: Throw an exception if data already exists. + * `error` or `errorifexists`: Throw an exception if data already exists. * `ignore`: Silently ignore this operation if data already exists. >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) @@ -675,7 +675,8 @@ class DataFrameWriter(OptionUtils): * ``append``: Append contents of this :class:`DataFrame` to existing data. * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. - * ``error`` (default case): Throw an exception if data already exists. + * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ + exists. :param partitionBy: names of partitioning columns :param options: all other string options @@ -713,12 +714,13 @@ class DataFrameWriter(OptionUtils): * `append`: Append contents of this :class:`DataFrame` to existing data. * `overwrite`: Overwrite existing data. - * `error`: Throw an exception if data already exists. + * `error` or `errorifexists`: Throw an exception if data already exists. * `ignore`: Silently ignore this operation if data already exists. :param name: the table name :param format: the format used to save - :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) + :param mode: one of `append`, `overwrite`, `error`, `errorifexists`, `ignore` \ + (default: error) :param partitionBy: names of partitioning columns :param options: all other string options """ @@ -741,7 +743,8 @@ class DataFrameWriter(OptionUtils): * ``append``: Append contents of this :class:`DataFrame` to existing data. * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. - * ``error`` (default case): Throw an exception if data already exists. + * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ + exists. :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). @@ -771,7 +774,8 @@ class DataFrameWriter(OptionUtils): * ``append``: Append contents of this :class:`DataFrame` to existing data. * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. - * ``error`` (default case): Throw an exception if data already exists. + * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ + exists. :param partitionBy: names of partitioning columns :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, snappy, gzip, and lzo). @@ -814,7 +818,8 @@ class DataFrameWriter(OptionUtils): * ``append``: Append contents of this :class:`DataFrame` to existing data. * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. - * ``error`` (default case): Throw an exception if data already exists. + * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ + exists. :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, @@ -874,7 +879,8 @@ class DataFrameWriter(OptionUtils): * ``append``: Append contents of this :class:`DataFrame` to existing data. * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. - * ``error`` (default case): Throw an exception if data already exists. + * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ + exists. :param partitionBy: names of partitioning columns :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, snappy, zlib, and lzo). @@ -905,7 +911,8 @@ class DataFrameWriter(OptionUtils): * ``append``: Append contents of this :class:`DataFrame` to existing data. * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. - * ``error`` (default case): Throw an exception if data already exists. + * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ + exists. :param properties: a dictionary of JDBC database connection arguments. Normally at least properties "user" and "password" with their corresponding values. For example { 'user' : 'SYSTEM', 'password' : 'mypassword' } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 8d95b24c00..e3fa2ced76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -65,7 +65,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * - `overwrite`: overwrite the existing data. * - `append`: append the data. * - `ignore`: ignore the operation (i.e. no-op). - * - `error`: default option, throw an exception at runtime. + * - `error` or `errorifexists`: default option, throw an exception at runtime. * * @since 1.4.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 872ef773e8..af20764f9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -184,15 +184,6 @@ private[sql] object SQLUtils extends Logging { colArray } - def saveMode(mode: String): SaveMode = { - mode match { - case "append" => SaveMode.Append - case "overwrite" => SaveMode.Overwrite - case "error" => SaveMode.ErrorIfExists - case "ignore" => SaveMode.Ignore - } - } - def readSqlObject(dis: DataInputStream, dataType: Char): Object = { dataType match { case 's' => -- GitLab