From 0c88e8d37224713199ca5661c2cd57f5918dcb9a Mon Sep 17 00:00:00 2001
From: gatorsmile <gatorsmile@gmail.com>
Date: Wed, 14 Jun 2017 16:28:06 +0800
Subject: [PATCH] [SPARK-21085][SQL] Failed to read the partitioned table
 created by Spark 2.1

### What changes were proposed in this pull request?
Before the PR, Spark is unable to read the partitioned table created by Spark 2.1 when the table schema does not put the partitioning column at the end of the schema.
[assert(partitionFields.map(_.name) == partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236)

When reading the table metadata from the metastore, we also need to reorder the columns.

### How was this patch tested?
Added test cases to check both Hive-serde and data source tables.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18295 from gatorsmile/reorderReadSchema.
---
 .../spark/sql/hive/HiveExternalCatalog.scala  | 31 ++++++++++++++++---
 .../sql/hive/HiveExternalCatalogSuite.scala   | 26 ++++++++++++++++
 2 files changed, 52 insertions(+), 5 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 7fcf06d66b..19453679a3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -729,6 +729,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) })
   }
 
+  // Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition
+  // columns are not put at the end of schema. We need to reorder it when reading the schema
+  // from the table properties.
+  private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = {
+    val partitionFields = partColumnNames.map { partCol =>
+      schema.find(_.name == partCol).getOrElse {
+        throw new AnalysisException("The metadata is corrupted. Unable to find the " +
+          s"partition column names from the schema. schema: ${schema.catalogString}. " +
+          s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
+      }
+    }
+    StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
+  }
+
   private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
     val hiveTable = table.copy(
       provider = Some(DDLUtils.HIVE_PROVIDER),
@@ -738,10 +752,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     // schema from table properties.
     if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
       val schemaFromTableProps = getSchemaFromTableProperties(table)
-      if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
+      val partColumnNames = getPartitionColumnsFromTableProperties(table)
+      val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
+
+      if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) {
         hiveTable.copy(
-          schema = schemaFromTableProps,
-          partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+          schema = reorderedSchema,
+          partitionColumnNames = partColumnNames,
           bucketSpec = getBucketSpecFromTableProperties(table))
       } else {
         // Hive metastore may change the table schema, e.g. schema inference. If the table
@@ -771,11 +788,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
     val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
+    val schemaFromTableProps = getSchemaFromTableProperties(table)
+    val partColumnNames = getPartitionColumnsFromTableProperties(table)
+    val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
+
     table.copy(
       provider = Some(provider),
       storage = storageWithLocation,
-      schema = getSchemaFromTableProperties(table),
-      partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+      schema = reorderedSchema,
+      partitionColumnNames = partColumnNames,
       bucketSpec = getBucketSpecFromTableProperties(table),
       tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG))
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index bd54c043c6..d43534d591 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -63,4 +63,30 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
     assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
     assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
   }
+
+  Seq("parquet", "hive").foreach { format =>
+    test(s"Partition columns should be put at the end of table schema for the format $format") {
+      val catalog = newBasicCatalog()
+      val newSchema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("partCol1", "int")
+        .add("partCol2", "string")
+      val table = CatalogTable(
+        identifier = TableIdentifier("tbl", Some("db1")),
+        tableType = CatalogTableType.MANAGED,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType()
+          .add("col1", "int")
+          .add("partCol1", "int")
+          .add("partCol2", "string")
+          .add("col2", "string"),
+        provider = Some(format),
+        partitionColumnNames = Seq("partCol1", "partCol2"))
+      catalog.createTable(table, ignoreIfExists = false)
+
+      val restoredTable = externalCatalog.getTable("db1", "tbl")
+      assert(restoredTable.schema == newSchema)
+    }
+  }
 }
-- 
GitLab