-
- Downloads
[SPARK-22165][SQL] Fixes type conflicts between double, long, decimals, dates...
[SPARK-22165][SQL] Fixes type conflicts between double, long, decimals, dates and timestamps in partition column ## What changes were proposed in this pull request? This PR proposes to add a rule that re-uses `TypeCoercion.findWiderCommonType` when resolving type conflicts in partition values. Currently, this uses numeric precedence-like comparison; therefore, it looks introducing failures for type conflicts between timestamps, dates and decimals, please see: ```scala private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) ... literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_)) ``` The codes below: ```scala val df = Seq((1, "2015-01-01"), (2, "2016-01-01 00:00:00")).toDF("i", "ts") df.write.format("parquet").partitionBy("ts").save("/tmp/foo") spark.read.load("/tmp/foo").printSchema() val df = Seq((1, "1"), (2, "1" * 30)).toDF("i", "decimal") df.write.format("parquet").partitionBy("decimal").save("/tmp/bar") spark.read.load("/tmp/bar").printSchema() ``` produces output as below: **Before** ``` root |-- i: integer (nullable = true) |-- ts: date (nullable = true) root |-- i: integer (nullable = true) |-- decimal: integer (nullable = true) ``` **After** ``` root |-- i: integer (nullable = true) |-- ts: timestamp (nullable = true) root |-- i: integer (nullable = true) |-- decimal: decimal(30,0) (nullable = true) ``` ### Type coercion table: This PR proposes the type conflict resolusion as below: **Before** |InputA \ InputB|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`| |------------------------|----------|----------|----------|----------|----------|----------|----------|----------| |**`NullType`**|`StringType`|`IntegerType`|`LongType`|`StringType`|`DoubleType`|`StringType`|`StringType`|`StringType`| |**`IntegerType`**|`IntegerType`|`IntegerType`|`LongType`|`IntegerType`|`DoubleType`|`IntegerType`|`IntegerType`|`StringType`| |**`LongType`**|`LongType`|`LongType`|`LongType`|`LongType`|`DoubleType`|`LongType`|`LongType`|`StringType`| |**`DecimalType(38,0)`**|`StringType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DecimalType(38,0)`|`DecimalType(38,0)`|`StringType`| |**`DoubleType`**|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`StringType`| |**`DateType`**|`StringType`|`IntegerType`|`LongType`|`DateType`|`DoubleType`|`DateType`|`DateType`|`StringType`| |**`TimestampType`**|`StringType`|`IntegerType`|`LongType`|`TimestampType`|`DoubleType`|`TimestampType`|`TimestampType`|`StringType`| |**`StringType`**|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`| **After** |InputA \ InputB|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`| |------------------------|----------|----------|----------|----------|----------|----------|----------|----------| |**`NullType`**|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`| |**`IntegerType`**|`IntegerType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`StringType`|`StringType`|`StringType`| |**`LongType`**|`LongType`|`LongType`|`LongType`|`DecimalType(38,0)`|`StringType`|`StringType`|`StringType`|`StringType`| |**`DecimalType(38,0)`**|`DecimalType(38,0)`|`DecimalType(38,0)`|`DecimalType(38,0)`|`DecimalType(38,0)`|`StringType`|`StringType`|`StringType`|`StringType`| |**`DoubleType`**|`DoubleType`|`DoubleType`|`StringType`|`StringType`|`DoubleType`|`StringType`|`StringType`|`StringType`| |**`DateType`**|`DateType`|`StringType`|`StringType`|`StringType`|`StringType`|`DateType`|`TimestampType`|`StringType`| |**`TimestampType`**|`TimestampType`|`StringType`|`StringType`|`StringType`|`StringType`|`TimestampType`|`TimestampType`|`StringType`| |**`StringType`**|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`| This was produced by: ```scala test("Print out chart") { val supportedTypes: Seq[DataType] = Seq( NullType, IntegerType, LongType, DecimalType(38, 0), DoubleType, DateType, TimestampType, StringType) // Old type conflict resolution: val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) def oldResolveTypeConflicts(dataTypes: Seq[DataType]): DataType = { val topType = dataTypes.maxBy(upCastingOrder.indexOf(_)) if (topType == NullType) StringType else topType } println(s"|InputA \\ InputB|${supportedTypes.map(dt => s"`${dt.toString}`").mkString("|")}|") println(s"|------------------------|${supportedTypes.map(_ => "----------").mkString("|")}|") supportedTypes.foreach { inputA => val types = supportedTypes.map(inputB => oldResolveTypeConflicts(Seq(inputA, inputB))) println(s"|**`$inputA`**|${types.map(dt => s"`${dt.toString}`").mkString("|")}|") } // New type conflict resolution: def newResolveTypeConflicts(dataTypes: Seq[DataType]): DataType = { dataTypes.fold[DataType](NullType)(findWiderTypeForPartitionColumn) } println(s"|InputA \\ InputB|${supportedTypes.map(dt => s"`${dt.toString}`").mkString("|")}|") println(s"|------------------------|${supportedTypes.map(_ => "----------").mkString("|")}|") supportedTypes.foreach { inputA => val types = supportedTypes.map(inputB => newResolveTypeConflicts(Seq(inputA, inputB))) println(s"|**`$inputA`**|${types.map(dt => s"`${dt.toString}`").mkString("|")}|") } } ``` ## How was this patch tested? Unit tests added in `ParquetPartitionDiscoverySuite`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #19389 from HyukjinKwon/partition-type-coercion.
Showing
- docs/sql-programming-guide.md 139 additions, 0 deletionsdocs/sql-programming-guide.md
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala 1 addition, 1 deletion...org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala 40 additions, 20 deletions...e/spark/sql/execution/datasources/PartitioningUtils.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala 55 additions, 2 deletions.../datasources/parquet/ParquetPartitionDiscoverySuite.scala
Loading
Please register or sign in to comment