diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index b7fdae58de4594cb62e586890d8b24a0249af6f8..232f5cf31f319721a191db0434c026618283cce3 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -429,6 +429,7 @@ export("structField", "structField.character", "print.structField", "structType", + "structType.character", "structType.jobj", "structType.structField", "print.structType") @@ -465,5 +466,6 @@ S3method(print, summary.GBTRegressionModel) S3method(print, summary.GBTClassificationModel) S3method(structField, character) S3method(structField, jobj) +S3method(structType, character) S3method(structType, jobj) S3method(structType, structField) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 3b9d42d6e715855c167c0f38b317c1e97375227c..e7a166c3014c19532a009b79a8edecefe27c6266 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1391,6 +1391,10 @@ setMethod("summarize", }) dapplyInternal <- function(x, func, schema) { + if (is.character(schema)) { + schema <- structType(schema) + } + packageNamesArr <- serialize(.sparkREnv[[".packages"]], connection = NULL) @@ -1408,6 +1412,8 @@ dapplyInternal <- function(x, func, schema) { dataFrame(sdf) } +setClassUnion("characterOrstructType", c("character", "structType")) + #' dapply #' #' Apply a function to each partition of a SparkDataFrame. @@ -1418,10 +1424,11 @@ dapplyInternal <- function(x, func, schema) { #' to each partition will be passed. #' The output of func should be a R data.frame. #' @param schema The schema of the resulting SparkDataFrame after the function is applied. -#' It must match the output of func. +#' It must match the output of func. Since Spark 2.3, the DDL-formatted string +#' is also supported for the schema. #' @family SparkDataFrame functions #' @rdname dapply -#' @aliases dapply,SparkDataFrame,function,structType-method +#' @aliases dapply,SparkDataFrame,function,characterOrstructType-method #' @name dapply #' @seealso \link{dapplyCollect} #' @export @@ -1444,6 +1451,17 @@ dapplyInternal <- function(x, func, schema) { #' y <- cbind(y, y[1] + 1L) #' }, #' schema) +#' +#' # The schema also can be specified in a DDL-formatted string. +#' schema <- "a INT, d DOUBLE, c STRING, d INT" +#' df1 <- dapply( +#' df, +#' function(x) { +#' y <- x[x[1] > 1, ] +#' y <- cbind(y, y[1] + 1L) +#' }, +#' schema) +#' #' collect(df1) #' # the result #' # a b c d @@ -1452,7 +1470,7 @@ dapplyInternal <- function(x, func, schema) { #' } #' @note dapply since 2.0.0 setMethod("dapply", - signature(x = "SparkDataFrame", func = "function", schema = "structType"), + signature(x = "SparkDataFrame", func = "function", schema = "characterOrstructType"), function(x, func, schema) { dapplyInternal(x, func, schema) }) @@ -1522,6 +1540,7 @@ setMethod("dapplyCollect", #' @param schema the schema of the resulting SparkDataFrame after the function is applied. #' The schema must match to output of \code{func}. It has to be defined for each #' output column with preferred output column name and corresponding data type. +#' Since Spark 2.3, the DDL-formatted string is also supported for the schema. #' @return A SparkDataFrame. #' @family SparkDataFrame functions #' @aliases gapply,SparkDataFrame-method @@ -1541,7 +1560,7 @@ setMethod("dapplyCollect", #' #' Here our output contains three columns, the key which is a combination of two #' columns with data types integer and string and the mean which is a double. -#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' schema <- structType(structField("a", "integer"), structField("c", "string"), #' structField("avg", "double")) #' result <- gapply( #' df, @@ -1550,6 +1569,15 @@ setMethod("dapplyCollect", #' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) #' }, schema) #' +#' The schema also can be specified in a DDL-formatted string. +#' schema <- "a INT, c STRING, avg DOUBLE" +#' result <- gapply( +#' df, +#' c("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' }, schema) +#' #' We can also group the data and afterwards call gapply on GroupedData. #' For Example: #' gdf <- group_by(df, "a", "c") diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index f28d26a51baa04c7a82a76aa09c225a4e0d5f56a..86507f13f038dfadeb6075c2f98b4f02cf1604db 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2174,8 +2174,9 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' #' @rdname column_collection_functions #' @param schema a structType object to use as the schema to use when parsing the JSON string. +#' Since Spark 2.3, the DDL-formatted string is also supported for the schema. #' @param as.json.array indicating if input string is JSON array of objects or a single object. -#' @aliases from_json from_json,Column,structType-method +#' @aliases from_json from_json,Column,characterOrstructType-method #' @export #' @examples #' @@ -2188,10 +2189,15 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' df2 <- sql("SELECT named_struct('name', 'Bob') as people") #' df2 <- mutate(df2, people_json = to_json(df2$people)) #' schema <- structType(structField("name", "string")) -#' head(select(df2, from_json(df2$people_json, schema)))} +#' head(select(df2, from_json(df2$people_json, schema))) +#' head(select(df2, from_json(df2$people_json, "name STRING")))} #' @note from_json since 2.2.0 -setMethod("from_json", signature(x = "Column", schema = "structType"), +setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"), function(x, schema, as.json.array = FALSE, ...) { + if (is.character(schema)) { + schema <- structType(schema) + } + if (as.json.array) { jschema <- callJStatic("org.apache.spark.sql.types.DataTypes", "createArrayType", diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R index 17f5283abead155ea3eb70684a83d4ef3ac85c65..0a7be0e9939754454684c4d9fc075fc2db373cea 100644 --- a/R/pkg/R/group.R +++ b/R/pkg/R/group.R @@ -233,6 +233,9 @@ setMethod("gapplyCollect", }) gapplyInternal <- function(x, func, schema) { + if (is.character(schema)) { + schema <- structType(schema) + } packageNamesArr <- serialize(.sparkREnv[[".packages"]], connection = NULL) broadcastArr <- lapply(ls(.broadcastNames), diff --git a/R/pkg/R/schema.R b/R/pkg/R/schema.R index cb5bdb90175bf4a896c162ebb59b9f42a1ab7fac..d1ed6833d5d02061f5ed2310ec90e159d7a41527 100644 --- a/R/pkg/R/schema.R +++ b/R/pkg/R/schema.R @@ -23,18 +23,24 @@ #' Create a structType object that contains the metadata for a SparkDataFrame. Intended for #' use with createDataFrame and toDF. #' -#' @param x a structField object (created with the field() function) +#' @param x a structField object (created with the \code{structField} method). Since Spark 2.3, +#' this can be a DDL-formatted string, which is a comma separated list of field +#' definitions, e.g., "a INT, b STRING". #' @param ... additional structField objects #' @return a structType object #' @rdname structType #' @export #' @examples #'\dontrun{ -#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' schema <- structType(structField("a", "integer"), structField("c", "string"), #' structField("avg", "double")) #' df1 <- gapply(df, list("a", "c"), #' function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) }, #' schema) +#' schema <- structType("a INT, c STRING, avg DOUBLE") +#' df1 <- gapply(df, list("a", "c"), +#' function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) }, +#' schema) #' } #' @note structType since 1.4.0 structType <- function(x, ...) { @@ -68,6 +74,23 @@ structType.structField <- function(x, ...) { structType(stObj) } +#' @rdname structType +#' @method structType character +#' @export +structType.character <- function(x, ...) { + if (!is.character(x)) { + stop("schema must be a DDL-formatted string.") + } + if (length(list(...)) > 0) { + stop("multiple DDL-formatted strings are not supported") + } + + stObj <- handledCallJStatic("org.apache.spark.sql.types.StructType", + "fromDDL", + x) + structType(stObj) +} + #' Print a Spark StructType. #' #' This function prints the contents of a StructType returned from the @@ -102,7 +125,7 @@ print.structType <- function(x, ...) { #' field1 <- structField("a", "integer") #' field2 <- structField("c", "string") #' field3 <- structField("avg", "double") -#' schema <- structType(field1, field2, field3) +#' schema <- structType(field1, field2, field3) #' df1 <- gapply(df, list("a", "c"), #' function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) }, #' schema) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index a2bcb5aefe16de65303ba3691788252ada990b17..77052d4a2834581c0afdfe044686cf6da1d50137 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -146,6 +146,13 @@ test_that("structType and structField", { expect_is(testSchema, "structType") expect_is(testSchema$fields()[[2]], "structField") expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType") + + testSchema <- structType("a STRING, b INT") + expect_is(testSchema, "structType") + expect_is(testSchema$fields()[[2]], "structField") + expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType") + + expect_error(structType("A stri"), "DataType stri is not supported.") }) test_that("structField type strings", { @@ -1480,13 +1487,15 @@ test_that("column functions", { j <- collect(select(df, alias(to_json(df$info), "json"))) expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}") df <- as.DataFrame(j) - schema <- structType(structField("age", "integer"), - structField("height", "double")) - s <- collect(select(df, alias(from_json(df$json, schema), "structcol"))) - expect_equal(ncol(s), 1) - expect_equal(nrow(s), 3) - expect_is(s[[1]][[1]], "struct") - expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } ))) + schemas <- list(structType(structField("age", "integer"), structField("height", "double")), + "age INT, height DOUBLE") + for (schema in schemas) { + s <- collect(select(df, alias(from_json(df$json, schema), "structcol"))) + expect_equal(ncol(s), 1) + expect_equal(nrow(s), 3) + expect_is(s[[1]][[1]], "struct") + expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } ))) + } # passing option df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) @@ -1504,14 +1513,15 @@ test_that("column functions", { # check if array type in string is correctly supported. jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]" df <- as.DataFrame(list(list("people" = jsonArr))) - schema <- structType(structField("name", "string")) - arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol"))) - expect_equal(ncol(arr), 1) - expect_equal(nrow(arr), 1) - expect_is(arr[[1]][[1]], "list") - expect_equal(length(arr$arrcol[[1]]), 2) - expect_equal(arr$arrcol[[1]][[1]]$name, "Bob") - expect_equal(arr$arrcol[[1]][[2]]$name, "Alice") + for (schema in list(structType(structField("name", "string")), "name STRING")) { + arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol"))) + expect_equal(ncol(arr), 1) + expect_equal(nrow(arr), 1) + expect_is(arr[[1]][[1]], "list") + expect_equal(length(arr$arrcol[[1]]), 2) + expect_equal(arr$arrcol[[1]][[1]]$name, "Bob") + expect_equal(arr$arrcol[[1]][[2]]$name, "Alice") + } # Test create_array() and create_map() df <- as.DataFrame(data.frame( @@ -2885,30 +2895,33 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { expect_identical(ldf, result) # Filter and add a column - schema <- structType(structField("a", "integer"), structField("b", "double"), - structField("c", "string"), structField("d", "integer")) - df1 <- dapply( - df, - function(x) { - y <- x[x$a > 1, ] - y <- cbind(y, y$a + 1L) - }, - schema) - result <- collect(df1) - expected <- ldf[ldf$a > 1, ] - expected$d <- expected$a + 1L - rownames(expected) <- NULL - expect_identical(expected, result) - - result <- dapplyCollect( - df, - function(x) { - y <- x[x$a > 1, ] - y <- cbind(y, y$a + 1L) - }) - expected1 <- expected - names(expected1) <- names(result) - expect_identical(expected1, result) + schemas <- list(structType(structField("a", "integer"), structField("b", "double"), + structField("c", "string"), structField("d", "integer")), + "a INT, b DOUBLE, c STRING, d INT") + for (schema in schemas) { + df1 <- dapply( + df, + function(x) { + y <- x[x$a > 1, ] + y <- cbind(y, y$a + 1L) + }, + schema) + result <- collect(df1) + expected <- ldf[ldf$a > 1, ] + expected$d <- expected$a + 1L + rownames(expected) <- NULL + expect_identical(expected, result) + + result <- dapplyCollect( + df, + function(x) { + y <- x[x$a > 1, ] + y <- cbind(y, y$a + 1L) + }) + expected1 <- expected + names(expected1) <- names(result) + expect_identical(expected1, result) + } # Remove the added column df2 <- dapply( @@ -3020,29 +3033,32 @@ test_that("gapply() and gapplyCollect() on a DataFrame", { # Computes the sum of second column by grouping on the first and third columns # and checks if the sum is larger than 2 - schema <- structType(structField("a", "integer"), structField("e", "boolean")) - df2 <- gapply( - df, - c(df$"a", df$"c"), - function(key, x) { - y <- data.frame(key[1], sum(x$b) > 2) - }, - schema) - actual <- collect(df2)$e - expected <- c(TRUE, TRUE) - expect_identical(actual, expected) - - df2Collect <- gapplyCollect( - df, - c(df$"a", df$"c"), - function(key, x) { - y <- data.frame(key[1], sum(x$b) > 2) - colnames(y) <- c("a", "e") - y - }) - actual <- df2Collect$e + schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")), + "a INT, e BOOLEAN") + for (schema in schemas) { + df2 <- gapply( + df, + c(df$"a", df$"c"), + function(key, x) { + y <- data.frame(key[1], sum(x$b) > 2) + }, + schema) + actual <- collect(df2)$e + expected <- c(TRUE, TRUE) expect_identical(actual, expected) + df2Collect <- gapplyCollect( + df, + c(df$"a", df$"c"), + function(key, x) { + y <- data.frame(key[1], sum(x$b) > 2) + colnames(y) <- c("a", "e") + y + }) + actual <- df2Collect$e + expect_identical(actual, expected) + } + # Computes the arithmetic mean of the second column by grouping # on the first and third columns. Output the groupping value and the average. schema <- structType(structField("a", "integer"), structField("c", "string"), diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 5d8ded83f667d055273b601b1e19059b0d522f8d..f3e7d033e97cf9727d4f11982584e906f6e71257 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1883,15 +1883,20 @@ def from_json(col, schema, options={}): string. :param col: string column in json format - :param schema: a StructType or ArrayType of StructType to use when parsing the json column + :param schema: a StructType or ArrayType of StructType to use when parsing the json column. :param options: options to control parsing. accepts the same options as the json datasource + .. note:: Since Spark 2.3, the DDL-formatted string or a JSON format string is also + supported for ``schema``. + >>> from pyspark.sql.types import * >>> data = [(1, '''{"a": 1}''')] >>> schema = StructType([StructField("a", IntegerType())]) >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(from_json(df.value, schema).alias("json")).collect() [Row(json=Row(a=1))] + >>> df.select(from_json(df.value, "a INT").alias("json")).collect() + [Row(json=Row(a=1))] >>> data = [(1, '''[{"a": 1}]''')] >>> schema = ArrayType(StructType([StructField("a", IntegerType())])) >>> df = spark.createDataFrame(data, ("key", "value")) @@ -1900,7 +1905,9 @@ def from_json(col, schema, options={}): """ sc = SparkContext._active_spark_context - jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options) + if isinstance(schema, DataType): + schema = schema.json() + jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options) return Column(jc) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 0c7b483f5c83647b0dd8082f819fb9c179b262a3..ebdeb42b0bfb1aa56f1a7fd06f72e0cb73eee977 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2114,7 +2114,7 @@ object functions { * Calculates the hash code of given columns, and returns the result as an int column. * * @group misc_funcs - * @since 2.0 + * @since 2.0.0 */ @scala.annotation.varargs def hash(cols: Column*): Column = withExpr { @@ -3074,9 +3074,8 @@ object functions { * string. * * @param e a string column containing JSON data. - * @param schema the schema to use when parsing the json string as a json string. In Spark 2.1, - * the user-provided schema has to be in JSON format. Since Spark 2.2, the DDL - * format is also supported for the schema. + * @param schema the schema to use when parsing the json string as a json string, it could be a + * JSON format string or a DDL-formatted string. * * @group collection_funcs * @since 2.3.0