From 0cdcf9114527a2c359c25e46fd6556b3855bfb28 Mon Sep 17 00:00:00 2001 From: hyukjinkwon <gurwls223@gmail.com> Date: Sun, 19 Mar 2017 22:33:01 -0700 Subject: [PATCH] [SPARK-19849][SQL] Support ArrayType in to_json to produce JSON array ## What changes were proposed in this pull request? This PR proposes to support an array of struct type in `to_json` as below: ```scala import org.apache.spark.sql.functions._ val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a") df.select(to_json($"a").as("json")).show() ``` ``` +----------+ | json| +----------+ |[{"_1":1}]| +----------+ ``` Currently, it throws an exception as below (a newline manually inserted for readability): ``` org.apache.spark.sql.AnalysisException: cannot resolve 'structtojson(`array`)' due to data type mismatch: structtojson requires that the expression is a struct expression.;; ``` This allows the roundtrip with `from_json` as below: ```scala import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val df = Seq("""[{"a":1}, {"a":2}]""").toDF("json").select(from_json($"json", schema).as("array")) df.show() // Read back. df.select(to_json($"array").as("json")).show() ``` ``` +----------+ | array| +----------+ |[[1], [2]]| +----------+ +-----------------+ | json| +-----------------+ |[{"a":1},{"a":2}]| +-----------------+ ``` Also, this PR proposes to rename from `StructToJson` to `StructsToJson ` and `JsonToStruct` to `JsonToStructs`. ## How was this patch tested? Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite` for Scala, doctest for Python and test in `test_sparkSQL.R` for R. Author: hyukjinkwon <gurwls223@gmail.com> Closes #17192 from HyukjinKwon/SPARK-19849. --- R/pkg/R/functions.R | 18 ++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 + python/pyspark/sql/functions.py | 15 ++- .../catalyst/analysis/FunctionRegistry.scala | 4 +- .../expressions/jsonExpressions.scala | 70 +++++++++----- .../sql/catalyst/json/JacksonGenerator.scala | 23 +++-- .../expressions/JsonExpressionsSuite.scala | 77 ++++++++++----- .../org/apache/spark/sql/functions.scala | 34 ++++--- .../sql-tests/inputs/json-functions.sql | 1 + .../sql-tests/results/json-functions.sql.out | 96 ++++++++++--------- .../apache/spark/sql/JsonFunctionsSuite.scala | 26 ++++- 11 files changed, 236 insertions(+), 132 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 9867f2d5b7..2cff3ac08c 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -1795,10 +1795,10 @@ setMethod("to_date", #' to_json #' -#' Converts a column containing a \code{structType} into a Column of JSON string. -#' Resolving the Column can fail if an unsupported type is encountered. +#' Converts a column containing a \code{structType} or array of \code{structType} into a Column +#' of JSON string. Resolving the Column can fail if an unsupported type is encountered. #' -#' @param x Column containing the struct +#' @param x Column containing the struct or array of the structs #' @param ... additional named properties to control how it is converted, accepts the same options #' as the JSON data source. #' @@ -1809,8 +1809,13 @@ setMethod("to_date", #' @export #' @examples #' \dontrun{ -#' to_json(df$t, dateFormat = 'dd/MM/yyyy') -#' select(df, to_json(df$t)) +#' # Converts a struct into a JSON object +#' df <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d") +#' select(df, to_json(df$d, dateFormat = 'dd/MM/yyyy')) +#' +#' # Converts an array of structs into a JSON array +#' df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people") +#' select(df, to_json(df$people)) #'} #' @note to_json since 2.2.0 setMethod("to_json", signature(x = "Column"), @@ -2433,7 +2438,8 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' from_json #' #' Parses a column containing a JSON string into a Column of \code{structType} with the specified -#' \code{schema}. If the string is unparseable, the Column will contains the value NA. +#' \code{schema} or array of \code{structType} if \code{asJsonArray} is set to \code{TRUE}. +#' If the string is unparseable, the Column will contains the value NA. #' #' @param x Column containing the JSON string. #' @param schema a structType object to use as the schema to use when parsing the JSON string. diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 32856b399c..9c38e0d866 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1340,6 +1340,10 @@ test_that("column functions", { expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4) # Test to_json(), from_json() + df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people") + j <- collect(select(df, alias(to_json(df$people), "json"))) + expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]") + df <- read.json(mapTypeJsonPath) j <- collect(select(df, alias(to_json(df$info), "json"))) expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}") diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 376b86ea69..f9121e60f3 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1774,10 +1774,11 @@ def json_tuple(col, *fields): def from_json(col, schema, options={}): """ Parses a column containing a JSON string into a [[StructType]] or [[ArrayType]] - with the specified schema. Returns `null`, in the case of an unparseable string. + of [[StructType]]s with the specified schema. Returns `null`, in the case of an unparseable + string. :param col: string column in json format - :param schema: a StructType or ArrayType to use when parsing the json column + :param schema: a StructType or ArrayType of StructType to use when parsing the json column :param options: options to control parsing. accepts the same options as the json datasource >>> from pyspark.sql.types import * @@ -1802,10 +1803,10 @@ def from_json(col, schema, options={}): @since(2.1) def to_json(col, options={}): """ - Converts a column containing a [[StructType]] into a JSON string. Throws an exception, - in the case of an unsupported type. + Converts a column containing a [[StructType]] or [[ArrayType]] of [[StructType]]s into a + JSON string. Throws an exception, in the case of an unsupported type. - :param col: name of column containing the struct + :param col: name of column containing the struct or array of the structs :param options: options to control converting. accepts the same options as the json datasource >>> from pyspark.sql import Row @@ -1814,6 +1815,10 @@ def to_json(col, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() [Row(json=u'{"age":2,"name":"Alice"}')] + >>> data = [(1, [Row(name='Alice', age=2), Row(name='Bob', age=3)])] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')] """ sc = SparkContext._active_spark_context diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 0486e67dbd..e1d83a86f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -425,8 +425,8 @@ object FunctionRegistry { expression[BitwiseXor]("^"), // json - expression[StructToJson]("to_json"), - expression[JsonToStruct]("from_json"), + expression[StructsToJson]("to_json"), + expression[JsonToStructs]("from_json"), // Cast aliases (SPARK-16730) castAlias("boolean", BooleanType), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 37e4bb5060..e4e08a8665 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, ParseModes} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -482,7 +482,8 @@ case class JsonTuple(children: Seq[Expression]) } /** - * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema. + * Converts an json input string to a [[StructType]] or [[ArrayType]] of [[StructType]]s + * with the specified schema. */ // scalastyle:off line.size.limit @ExpressionDescription( @@ -495,7 +496,7 @@ case class JsonTuple(children: Seq[Expression]) {"time":"2015-08-26 00:00:00.0"} """) // scalastyle:on line.size.limit -case class JsonToStruct( +case class JsonToStructs( schema: DataType, options: Map[String, String], child: Expression, @@ -590,7 +591,7 @@ case class JsonToStruct( } /** - * Converts a [[StructType]] to a json output string. + * Converts a [[StructType]] or [[ArrayType]] of [[StructType]]s to a json output string. */ // scalastyle:off line.size.limit @ExpressionDescription( @@ -601,9 +602,11 @@ case class JsonToStruct( {"a":1,"b":2} > SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); {"time":"26/08/2015"} + > SELECT _FUNC_(array(named_struct('a', 1, 'b', 2)); + [{"a":1,"b":2}] """) // scalastyle:on line.size.limit -case class StructToJson( +case class StructsToJson( options: Map[String, String], child: Expression, timeZoneId: Option[String] = None) @@ -624,41 +627,58 @@ case class StructToJson( lazy val writer = new CharArrayWriter() @transient - lazy val gen = - new JacksonGenerator( - child.dataType.asInstanceOf[StructType], - writer, - new JSONOptions(options, timeZoneId.get)) + lazy val gen = new JacksonGenerator( + rowSchema, writer, new JSONOptions(options, timeZoneId.get)) + + @transient + lazy val rowSchema = child.dataType match { + case st: StructType => st + case ArrayType(st: StructType, _) => st + } + + // This converts rows to the JSON output according to the given schema. + @transient + lazy val converter: Any => UTF8String = { + def getAndReset(): UTF8String = { + gen.flush() + val json = writer.toString + writer.reset() + UTF8String.fromString(json) + } + + child.dataType match { + case _: StructType => + (row: Any) => + gen.write(row.asInstanceOf[InternalRow]) + getAndReset() + case ArrayType(_: StructType, _) => + (arr: Any) => + gen.write(arr.asInstanceOf[ArrayData]) + getAndReset() + } + } override def dataType: DataType = StringType - override def checkInputDataTypes(): TypeCheckResult = { - if (StructType.acceptsType(child.dataType)) { + override def checkInputDataTypes(): TypeCheckResult = child.dataType match { + case _: StructType | ArrayType(_: StructType, _) => try { - JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType]) + JacksonUtils.verifySchema(rowSchema) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } - } else { - TypeCheckResult.TypeCheckFailure( - s"$prettyName requires that the expression is a struct expression.") - } + case _ => TypeCheckResult.TypeCheckFailure( + s"Input type ${child.dataType.simpleString} must be a struct or array of structs.") } override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = copy(timeZoneId = Option(timeZoneId)) - override def nullSafeEval(row: Any): Any = { - gen.write(row.asInstanceOf[InternalRow]) - gen.flush() - val json = writer.toString - writer.reset() - UTF8String.fromString(json) - } + override def nullSafeEval(value: Any): Any = converter(value) - override def inputTypes: Seq[AbstractDataType] = StructType :: Nil + override def inputTypes: Seq[AbstractDataType] = TypeCollection(ArrayType, StructType) :: Nil } object JsonExprUtils { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index dec55279c9..1d302aea6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -37,6 +37,10 @@ private[sql] class JacksonGenerator( // `ValueWriter`s for all fields of the schema private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray + // `ValueWriter` for array data storing rows of the schema. + private val arrElementWriter: ValueWriter = (arr: SpecializedGetters, i: Int) => { + writeObject(writeFields(arr.getStruct(i, schema.length), schema, rootFieldWriters)) + } private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) @@ -185,17 +189,18 @@ private[sql] class JacksonGenerator( def flush(): Unit = gen.flush() /** - * Transforms a single InternalRow to JSON using Jackson + * Transforms a single `InternalRow` to JSON object using Jackson * * @param row The row to convert */ - def write(row: InternalRow): Unit = { - writeObject { - writeFields(row, schema, rootFieldWriters) - } - } + def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) - def writeLineEnding(): Unit = { - gen.writeRaw('\n') - } + /** + * Transforms multiple `InternalRow`s to JSON array using Jackson + * + * @param array The array of rows to convert + */ + def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + + def writeLineEnding(): Unit = gen.writeRaw('\n') } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 19d0c8eb92..e4698d4463 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -21,7 +21,7 @@ import java.util.Calendar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes} +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, ParseModes} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -352,7 +352,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val jsonData = """{"a": 1}""" val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), + JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId), InternalRow(1) ) } @@ -361,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val jsonData = """{"a" 1}""" val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), + JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId), null ) // Other modes should still return `null`. checkEvaluation( - JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId), + JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId), null ) } @@ -376,62 +376,62 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val input = """[{"a": 1}, {"a": 2}]""" val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val output = InternalRow(1) :: InternalRow(2) :: Nil - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=object, schema=array, output=array of single row") { val input = """{"a": 1}""" val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val output = InternalRow(1) :: Nil - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=empty array, schema=array, output=empty array") { val input = "[ ]" val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val output = Nil - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=empty object, schema=array, output=array of single row with null") { val input = "{ }" val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) val output = InternalRow(null) :: Nil - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=array of single object, schema=struct, output=single row") { val input = """[{"a": 1}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) val output = InternalRow(1) - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=array, schema=struct, output=null") { val input = """[{"a": 1}, {"a": 2}]""" val schema = StructType(StructField("a", IntegerType) :: Nil) val output = null - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=empty array, schema=struct, output=null") { val input = """[]""" val schema = StructType(StructField("a", IntegerType) :: Nil) val output = null - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json - input=empty object, schema=struct, output=single row with null") { val input = """{ }""" val schema = StructType(StructField("a", IntegerType) :: Nil) val output = InternalRow(null) - checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output) + checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) } test("from_json null input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId), + JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId), null ) } @@ -444,14 +444,14 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 123) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId), + JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId), InternalRow(c.getTimeInMillis * 1000L) ) // The result doesn't change because the json string includes timezone string ("Z" here), // which means the string represents the timestamp string in the timezone regardless of // the timeZoneId parameter. checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")), + JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")), InternalRow(c.getTimeInMillis * 1000L) ) @@ -461,7 +461,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 0) checkEvaluation( - JsonToStruct( + JsonToStructs( schema, Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), Literal(jsonData2), @@ -469,7 +469,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { InternalRow(c.getTimeInMillis * 1000L) ) checkEvaluation( - JsonToStruct( + JsonToStructs( schema, Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", DateTimeUtils.TIMEZONE_OPTION -> tz.getID), @@ -483,25 +483,52 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("SPARK-19543: from_json empty input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId), + JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId), null ) } - test("to_json") { + test("to_json - struct") { val schema = StructType(StructField("a", IntegerType) :: Nil) val struct = Literal.create(create_row(1), schema) checkEvaluation( - StructToJson(Map.empty, struct, gmtId), + StructsToJson(Map.empty, struct, gmtId), """{"a":1}""" ) } + test("to_json - array") { + val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil) + val output = """[{"a":1},{"a":2}]""" + checkEvaluation( + StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId), + output) + } + + test("to_json - array with single empty row") { + val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val input = new GenericArrayData(InternalRow(null) :: Nil) + val output = """[{}]""" + checkEvaluation( + StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId), + output) + } + + test("to_json - empty array") { + val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val input = new GenericArrayData(Nil) + val output = """[]""" + checkEvaluation( + StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId), + output) + } + test("to_json null input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) val struct = Literal.create(null, schema) checkEvaluation( - StructToJson(Map.empty, struct, gmtId), + StructsToJson(Map.empty, struct, gmtId), null ) } @@ -514,16 +541,16 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema) checkEvaluation( - StructToJson(Map.empty, struct, gmtId), + StructsToJson(Map.empty, struct, gmtId), """{"t":"2016-01-01T00:00:00.000Z"}""" ) checkEvaluation( - StructToJson(Map.empty, struct, Option("PST")), + StructsToJson(Map.empty, struct, Option("PST")), """{"t":"2015-12-31T16:00:00.000-08:00"}""" ) checkEvaluation( - StructToJson( + StructsToJson( Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", DateTimeUtils.TIMEZONE_OPTION -> gmtId.get), struct, @@ -531,7 +558,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { """{"t":"2016-01-01T00:00:00"}""" ) checkEvaluation( - StructToJson( + StructsToJson( Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", DateTimeUtils.TIMEZONE_OPTION -> "PST"), struct, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 201f726db3..a9f089c850 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2978,7 +2978,8 @@ object functions { /** * (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType` - * with the specified schema. Returns `null`, in the case of an unparseable string. + * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable + * string. * * @param e a string column containing JSON data. * @param schema the schema to use when parsing the json string @@ -2989,7 +2990,7 @@ object functions { * @since 2.2.0 */ def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr { - JsonToStruct(schema, options, e.expr) + JsonToStructs(schema, options, e.expr) } /** @@ -3009,7 +3010,8 @@ object functions { /** * (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType` - * with the specified schema. Returns `null`, in the case of an unparseable string. + * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable + * string. * * @param e a string column containing JSON data. * @param schema the schema to use when parsing the json string @@ -3036,7 +3038,7 @@ object functions { from_json(e, schema, Map.empty[String, String]) /** - * Parses a column containing a JSON string into a `StructType` or `ArrayType` + * Parses a column containing a JSON string into a `StructType` or `ArrayType` of `StructType`s * with the specified schema. Returns `null`, in the case of an unparseable string. * * @param e a string column containing JSON data. @@ -3049,7 +3051,7 @@ object functions { from_json(e, schema, Map.empty[String, String]) /** - * Parses a column containing a JSON string into a `StructType` or `ArrayType` + * Parses a column containing a JSON string into a `StructType` or `ArrayType` of `StructType`s * with the specified schema. Returns `null`, in the case of an unparseable string. * * @param e a string column containing JSON data. @@ -3062,10 +3064,11 @@ object functions { from_json(e, DataType.fromJson(schema), options) /** - * (Scala-specific) Converts a column containing a `StructType` into a JSON string with the - * specified schema. Throws an exception, in the case of an unsupported type. + * (Scala-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s + * into a JSON string with the specified schema. Throws an exception, in the case of an + * unsupported type. * - * @param e a struct column. + * @param e a column containing a struct or array of the structs. * @param options options to control how the struct column is converted into a json string. * accepts the same options and the json data source. * @@ -3073,14 +3076,15 @@ object functions { * @since 2.1.0 */ def to_json(e: Column, options: Map[String, String]): Column = withExpr { - StructToJson(options, e.expr) + StructsToJson(options, e.expr) } /** - * (Java-specific) Converts a column containing a `StructType` into a JSON string with the - * specified schema. Throws an exception, in the case of an unsupported type. + * (Java-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s + * into a JSON string with the specified schema. Throws an exception, in the case of an + * unsupported type. * - * @param e a struct column. + * @param e a column containing a struct or array of the structs. * @param options options to control how the struct column is converted into a json string. * accepts the same options and the json data source. * @@ -3091,10 +3095,10 @@ object functions { to_json(e, options.asScala.toMap) /** - * Converts a column containing a `StructType` into a JSON string with the - * specified schema. Throws an exception, in the case of an unsupported type. + * Converts a column containing a `StructType` or `ArrayType` of `StructType`s into a JSON string + * with the specified schema. Throws an exception, in the case of an unsupported type. * - * @param e a struct column. + * @param e a column containing a struct or array of the structs. * * @group collection_funcs * @since 2.1.0 diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 83243c5e5a..b3cc2cea51 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -3,6 +3,7 @@ describe function to_json; describe function extended to_json; select to_json(named_struct('a', 1, 'b', 2)); select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); +select to_json(array(named_struct('a', 1, 'b', 2))); -- Check if errors handled select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')); select to_json(named_struct('a', 1, 'b', 2), map('mode', 1)); diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index b57cbbc1d8..315e1730ce 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 16 +-- Number of queries: 17 -- !query 0 @@ -7,7 +7,7 @@ describe function to_json -- !query 0 schema struct<function_desc:string> -- !query 0 output -Class: org.apache.spark.sql.catalyst.expressions.StructToJson +Class: org.apache.spark.sql.catalyst.expressions.StructsToJson Function: to_json Usage: to_json(expr[, options]) - Returns a json string with a given struct value @@ -17,13 +17,15 @@ describe function extended to_json -- !query 1 schema struct<function_desc:string> -- !query 1 output -Class: org.apache.spark.sql.catalyst.expressions.StructToJson +Class: org.apache.spark.sql.catalyst.expressions.StructsToJson Extended Usage: Examples: > SELECT to_json(named_struct('a', 1, 'b', 2)); {"a":1,"b":2} > SELECT to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); {"time":"26/08/2015"} + > SELECT to_json(array(named_struct('a', 1, 'b', 2)); + [{"a":1,"b":2}] Function: to_json Usage: to_json(expr[, options]) - Returns a json string with a given struct value @@ -32,7 +34,7 @@ Usage: to_json(expr[, options]) - Returns a json string with a given struct valu -- !query 2 select to_json(named_struct('a', 1, 'b', 2)) -- !query 2 schema -struct<structtojson(named_struct(a, 1, b, 2)):string> +struct<structstojson(named_struct(a, 1, b, 2)):string> -- !query 2 output {"a":1,"b":2} @@ -40,54 +42,62 @@ struct<structtojson(named_struct(a, 1, b, 2)):string> -- !query 3 select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')) -- !query 3 schema -struct<structtojson(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd'))):string> +struct<structstojson(named_struct(time, to_timestamp('2015-08-26', 'yyyy-MM-dd'))):string> -- !query 3 output {"time":"26/08/2015"} -- !query 4 -select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')) +select to_json(array(named_struct('a', 1, 'b', 2))) -- !query 4 schema -struct<> +struct<structstojson(array(named_struct(a, 1, b, 2))):string> -- !query 4 output -org.apache.spark.sql.AnalysisException -Must use a map() function for options;; line 1 pos 7 +[{"a":1,"b":2}] -- !query 5 -select to_json(named_struct('a', 1, 'b', 2), map('mode', 1)) +select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')) -- !query 5 schema struct<> -- !query 5 output org.apache.spark.sql.AnalysisException -A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7 +Must use a map() function for options;; line 1 pos 7 -- !query 6 -select to_json() +select to_json(named_struct('a', 1, 'b', 2), map('mode', 1)) -- !query 6 schema struct<> -- !query 6 output org.apache.spark.sql.AnalysisException -Invalid number of arguments for function to_json; line 1 pos 7 +A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7 -- !query 7 -describe function from_json +select to_json() -- !query 7 schema -struct<function_desc:string> +struct<> -- !query 7 output -Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct -Function: from_json -Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. +org.apache.spark.sql.AnalysisException +Invalid number of arguments for function to_json; line 1 pos 7 -- !query 8 -describe function extended from_json +describe function from_json -- !query 8 schema struct<function_desc:string> -- !query 8 output -Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct +Class: org.apache.spark.sql.catalyst.expressions.JsonToStructs +Function: from_json +Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. + + +-- !query 9 +describe function extended from_json +-- !query 9 schema +struct<function_desc:string> +-- !query 9 output +Class: org.apache.spark.sql.catalyst.expressions.JsonToStructs Extended Usage: Examples: > SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE'); @@ -99,36 +109,36 @@ Function: from_json Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. --- !query 9 +-- !query 10 select from_json('{"a":1}', 'a INT') --- !query 9 schema -struct<jsontostruct({"a":1}):struct<a:int>> --- !query 9 output +-- !query 10 schema +struct<jsontostructs({"a":1}):struct<a:int>> +-- !query 10 output {"a":1} --- !query 10 +-- !query 11 select from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')) --- !query 10 schema -struct<jsontostruct({"time":"26/08/2015"}):struct<time:timestamp>> --- !query 10 output +-- !query 11 schema +struct<jsontostructs({"time":"26/08/2015"}):struct<time:timestamp>> +-- !query 11 output {"time":2015-08-26 00:00:00.0} --- !query 11 +-- !query 12 select from_json('{"a":1}', 1) --- !query 11 schema +-- !query 12 schema struct<> --- !query 11 output +-- !query 12 output org.apache.spark.sql.AnalysisException Expected a string literal instead of 1;; line 1 pos 7 --- !query 12 +-- !query 13 select from_json('{"a":1}', 'a InvalidType') --- !query 12 schema +-- !query 13 schema struct<> --- !query 12 output +-- !query 13 output org.apache.spark.sql.AnalysisException DataType invalidtype() is not supported.(line 1, pos 2) @@ -139,28 +149,28 @@ a InvalidType ; line 1 pos 7 --- !query 13 +-- !query 14 select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE')) --- !query 13 schema +-- !query 14 schema struct<> --- !query 13 output +-- !query 14 output org.apache.spark.sql.AnalysisException Must use a map() function for options;; line 1 pos 7 --- !query 14 +-- !query 15 select from_json('{"a":1}', 'a INT', map('mode', 1)) --- !query 14 schema +-- !query 15 schema struct<> --- !query 14 output +-- !query 15 output org.apache.spark.sql.AnalysisException A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7 --- !query 15 +-- !query 16 select from_json() --- !query 15 schema +-- !query 16 schema struct<> --- !query 15 output +-- !query 16 output org.apache.spark.sql.AnalysisException Invalid number of arguments for function from_json; line 1 pos 7 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 2345b82081..170c238c53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -156,7 +156,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row(Seq(Row(1, "a"), Row(2, null), Row(null, null)))) } - test("to_json") { + test("to_json - struct") { val df = Seq(Tuple1(Tuple1(1))).toDF("a") checkAnswer( @@ -164,6 +164,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row("""{"_1":1}""") :: Nil) } + test("to_json - array") { + val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a") + + checkAnswer( + df.select(to_json($"a")), + Row("""[{"_1":1}]""") :: Nil) + } + test("to_json with option") { val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm") @@ -184,7 +192,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { "Unable to convert column a of type calendarinterval to JSON.")) } - test("roundtrip in to_json and from_json") { + test("roundtrip in to_json and from_json - struct") { val dfOne = Seq(Tuple1(Tuple1(1)), Tuple1(null)).toDF("struct") val schemaOne = dfOne.schema(0).dataType.asInstanceOf[StructType] val readBackOne = dfOne.select(to_json($"struct").as("json")) @@ -198,6 +206,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(dfTwo, readBackTwo) } + test("roundtrip in to_json and from_json - array") { + val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: Nil)).toDF("array") + val schemaOne = dfOne.schema(0).dataType + val readBackOne = dfOne.select(to_json($"array").as("json")) + .select(from_json($"json", schemaOne).as("array")) + checkAnswer(dfOne, readBackOne) + + val dfTwo = Seq(Some("""[{"a":1}]"""), None).toDF("json") + val schemaTwo = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) + val readBackTwo = dfTwo.select(from_json($"json", schemaTwo).as("array")) + .select(to_json($"array").as("json")) + checkAnswer(dfTwo, readBackTwo) + } + test("SPARK-19637 Support to_json in SQL") { val df1 = Seq(Tuple1(Tuple1(1))).toDF("a") checkAnswer( -- GitLab