diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 7fcf06d66b5eabb8d8d0c75cbe0e0332a06d044a..19453679a30df71dfc01cb84b0c546db39e20ed5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -729,6 +729,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) })
   }
 
+  // Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition
+  // columns are not put at the end of schema. We need to reorder it when reading the schema
+  // from the table properties.
+  private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = {
+    val partitionFields = partColumnNames.map { partCol =>
+      schema.find(_.name == partCol).getOrElse {
+        throw new AnalysisException("The metadata is corrupted. Unable to find the " +
+          s"partition column names from the schema. schema: ${schema.catalogString}. " +
+          s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
+      }
+    }
+    StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
+  }
+
   private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
     val hiveTable = table.copy(
       provider = Some(DDLUtils.HIVE_PROVIDER),
@@ -738,10 +752,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     // schema from table properties.
     if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
       val schemaFromTableProps = getSchemaFromTableProperties(table)
-      if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
+      val partColumnNames = getPartitionColumnsFromTableProperties(table)
+      val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
+
+      if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) {
         hiveTable.copy(
-          schema = schemaFromTableProps,
-          partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+          schema = reorderedSchema,
+          partitionColumnNames = partColumnNames,
           bucketSpec = getBucketSpecFromTableProperties(table))
       } else {
         // Hive metastore may change the table schema, e.g. schema inference. If the table
@@ -771,11 +788,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
     val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
+    val schemaFromTableProps = getSchemaFromTableProperties(table)
+    val partColumnNames = getPartitionColumnsFromTableProperties(table)
+    val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
+
     table.copy(
       provider = Some(provider),
       storage = storageWithLocation,
-      schema = getSchemaFromTableProperties(table),
-      partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+      schema = reorderedSchema,
+      partitionColumnNames = partColumnNames,
       bucketSpec = getBucketSpecFromTableProperties(table),
       tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG))
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index bd54c043c6ec44c4e2feb30309c94f9aa38036b2..d43534d5914d141bee79a77a738acb2df903a9ee 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -63,4 +63,30 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
     assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
     assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
   }
+
+  Seq("parquet", "hive").foreach { format =>
+    test(s"Partition columns should be put at the end of table schema for the format $format") {
+      val catalog = newBasicCatalog()
+      val newSchema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("partCol1", "int")
+        .add("partCol2", "string")
+      val table = CatalogTable(
+        identifier = TableIdentifier("tbl", Some("db1")),
+        tableType = CatalogTableType.MANAGED,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType()
+          .add("col1", "int")
+          .add("partCol1", "int")
+          .add("partCol2", "string")
+          .add("col2", "string"),
+        provider = Some(format),
+        partitionColumnNames = Seq("partCol1", "partCol2"))
+      catalog.createTable(table, ignoreIfExists = false)
+
+      val restoredTable = externalCatalog.getTable("db1", "tbl")
+      assert(restoredTable.schema == newSchema)
+    }
+  }
 }