diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 70062eae3b7ceb1c91cd110bdd452451641711c8..873221835daf8d8f04a49f4f9df3a715f6ca849e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -68,8 +68,15 @@ private[sql] object JsonRDD extends Logging { val (topLevel, structLike) = values.partition(_.size == 1) val topLevelFields = topLevel.filter { name => resolved.get(prefix ++ name).get match { - case ArrayType(StructType(Nil), _) => false - case ArrayType(_, _) => true + case ArrayType(elementType, _) => { + def hasInnerStruct(t: DataType): Boolean = t match { + case s: StructType => false + case ArrayType(t1, _) => hasInnerStruct(t1) + case o => true + } + + hasInnerStruct(elementType) + } case struct: StructType => false case _ => true } @@ -84,7 +91,18 @@ private[sql] object JsonRDD extends Logging { val dataType = resolved.get(prefix :+ name).get dataType match { case array: ArrayType => - Some(StructField(name, ArrayType(structType, array.containsNull), nullable = true)) + // The pattern of this array is ArrayType(...(ArrayType(StructType))). + // Since the inner struct of array is a placeholder (StructType(Nil)), + // we need to replace this placeholder with the actual StructType (structType). + def getActualArrayType( + innerStruct: StructType, + currentArray: ArrayType): ArrayType = currentArray match { + case ArrayType(s: StructType, containsNull) => + ArrayType(innerStruct, containsNull) + case ArrayType(a: ArrayType, containsNull) => + ArrayType(getActualArrayType(innerStruct, a), containsNull) + } + Some(StructField(name, getActualArrayType(structType, array), nullable = true)) case struct: StructType => Some(StructField(name, structType, nullable = true)) // dataType is StringType means that we have resolved type conflicts involving // primitive types and complex types. So, the type of name has been relaxed to @@ -168,8 +186,7 @@ private[sql] object JsonRDD extends Logging { /** * Returns the element type of an JSON array. We go through all elements of this array * to detect any possible type conflict. We use [[compatibleType]] to resolve - * type conflicts. Right now, when the element of an array is another array, we - * treat the element as String. + * type conflicts. */ private def typeOfArray(l: Seq[Any]): ArrayType = { val containsNull = l.exists(v => v == null) @@ -216,18 +233,24 @@ private[sql] object JsonRDD extends Logging { } case (key: String, array: Seq[_]) => { // The value associated with the key is an array. - typeOfArray(array) match { + // Handle inner structs of an array. + def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, DataType)] = t match { case ArrayType(StructType(Nil), containsNull) => { // The elements of this arrays are structs. - array.asInstanceOf[Seq[Map[String, Any]]].flatMap { + v.asInstanceOf[Seq[Map[String, Any]]].flatMap { element => allKeysWithValueTypes(element) }.map { - case (k, dataType) => (s"$key.$k", dataType) - } :+ (key, ArrayType(StructType(Nil), containsNull)) + case (k, t) => (s"$key.$k", t) + } } - case ArrayType(elementType, containsNull) => - (key, ArrayType(elementType, containsNull)) :: Nil + case ArrayType(t1, containsNull) => + v.asInstanceOf[Seq[Any]].flatMap { + element => buildKeyPathForInnerStructs(element, t1) + } + case other => Nil } + val elementType = typeOfArray(array) + buildKeyPathForInnerStructs(array, elementType) :+ (key, elementType) } case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil } @@ -339,8 +362,6 @@ private[sql] object JsonRDD extends Logging { null } else { desiredType match { - case ArrayType(elementType, _) => - value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType)) case StringType => toString(value) case IntegerType => value.asInstanceOf[IntegerType.JvmType] case LongType => toLong(value) @@ -348,6 +369,10 @@ private[sql] object JsonRDD extends Logging { case DecimalType => toDecimal(value) case BooleanType => value.asInstanceOf[BooleanType.JvmType] case NullType => null + + case ArrayType(elementType, _) => + value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType)) + case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct) } } } @@ -356,22 +381,9 @@ private[sql] object JsonRDD extends Logging { // TODO: Reuse the row instead of creating a new one for every record. val row = new GenericMutableRow(schema.fields.length) schema.fields.zipWithIndex.foreach { - // StructType - case (StructField(name, fields: StructType, _), i) => - row.update(i, json.get(name).flatMap(v => Option(v)).map( - v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull) - - // ArrayType(StructType) - case (StructField(name, ArrayType(structType: StructType, _), _), i) => - row.update(i, - json.get(name).flatMap(v => Option(v)).map( - v => v.asInstanceOf[Seq[Any]].map( - e => asRow(e.asInstanceOf[Map[String, Any]], structType))).orNull) - - // Other cases case (StructField(name, dataType, _), i) => row.update(i, json.get(name).flatMap(v => Option(v)).map( - enforceCorrectType(_, dataType)).getOrElse(null)) + enforceCorrectType(_, dataType)).orNull) } row diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 301d482d27d86262c7456c12c55aa4eafea71a06..b50d93855405a3c932da7927d8a73d6b99a9a75c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -591,8 +591,35 @@ class JsonSuite extends QueryTest { (true, "str1") :: Nil ) checkAnswer( - sql("select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1] from jsonTable"), + sql( + """ + |select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1] + |from jsonTable + """.stripMargin), ("str2", 6) :: Nil ) } + + test("SPARK-3390 Complex arrays") { + val jsonSchemaRDD = jsonRDD(complexFieldAndType2) + jsonSchemaRDD.registerTempTable("jsonTable") + + checkAnswer( + sql( + """ + |select arrayOfArray1[0][0][0], arrayOfArray1[1][0][1], arrayOfArray1[1][1][0] + |from jsonTable + """.stripMargin), + (5, 7, 8) :: Nil + ) + checkAnswer( + sql( + """ + |select arrayOfArray2[0][0][0].inner1, arrayOfArray2[1][0], + |arrayOfArray2[1][1][1].inner2[0], arrayOfArray2[2][0][0].inner3[0][0].inner4 + |from jsonTable + """.stripMargin), + ("str1", Nil, "str4", 2) :: Nil + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala index b3f95f08e80442158b254c2b75c2208eb314a2ae..5f0b3959a63adfa2219f3c0919999693c2d1fbcb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala @@ -106,6 +106,34 @@ object TestJsonData { "inner1": "str4" }], "field2": [[5, 6], [7, 8]] - }] + }], + "arrayOfArray1": [ + [ + [5] + ], + [ + [6, 7], + [8] + ]], + "arrayOfArray2": [ + [ + [ + { + "inner1": "str1" + } + ] + ], + [ + [], + [ + {"inner2": ["str3", "str33"]}, + {"inner2": ["str4"], "inner1": "str11"} + ] + ], + [ + [ + {"inner3": [[{"inner4": 2}]]} + ] + ]] }""" :: Nil) }