diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 2d7d0f903295614a701c7a8b697ab98adc5d4abd..cace0267013846b17a4fcbc492c2a99c648433d4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -125,14 +125,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
           val namedQuery = visitNamedQuery(nCtx)
           (namedQuery.alias, namedQuery)
       }
-
       // Check for duplicate names.
-      ctes.groupBy(_._1).filter(_._2.size > 1).foreach {
-        case (name, _) =>
-          throw new ParseException(
-            s"Name '$name' is used for multiple common table expressions", ctx)
-      }
-
+      checkDuplicateKeys(ctes, ctx)
       With(query, ctes.toMap)
     }
   }
@@ -220,11 +214,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
    */
   override def visitPartitionSpec(
       ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
-    ctx.partitionVal.asScala.map { pVal =>
+    val parts = ctx.partitionVal.asScala.map { pVal =>
       val name = pVal.identifier.getText.toLowerCase
       val value = Option(pVal.constant).map(visitStringConstant)
       name -> value
-    }.toMap
+    }
+    // Check for duplicate partition columns in one spec.
+    checkDuplicateKeys(parts, ctx)
+    parts.toMap
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
index 58e2bdb6e24fbb20ce25301a45cedd971cb8f619..9619884edeafee242d0b99608bad308c161b6093 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
@@ -43,6 +43,13 @@ object ParserUtils {
     new ParseException(s"Operation not allowed: $message", ctx)
   }
 
+  /** Check if duplicate keys exist in a set of key-value pairs. */
+  def checkDuplicateKeys[T](keyPairs: Seq[(String, T)], ctx: ParserRuleContext): Unit = {
+    keyPairs.groupBy(_._1).filter(_._2.size > 1).foreach { case (key, _) =>
+      throw new ParseException(s"Found duplicate keys '$key'.", ctx)
+    }
+  }
+
   /** Get the code that creates the given node. */
   def source(ctx: ParserRuleContext): String = {
     val stream = ctx.getStart.getInputStream
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 25d87d93bec42a8037431fd83cfab781d7405432..5811d32cd9e6290840b046d662553977fb80c5b9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -108,7 +108,7 @@ class PlanParserSuite extends PlanTest {
         "cte2" -> table("cte1").select(star())))
     intercept(
       "with cte1 (select 1), cte1 as (select 1 from cte1) select * from cte1",
-      "Name 'cte1' is used for multiple common table expressions")
+      "Found duplicate keys 'cte1'")
   }
 
   test("simple select query") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 2e3ac9706daf8cf003959452a6b22db93e282c79..c517b8b55fadb7ecd5e9c4263e9c9d947ffcf293 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -387,11 +387,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitTablePropertyList(
       ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
-    ctx.tableProperty.asScala.map { property =>
+    val properties = ctx.tableProperty.asScala.map { property =>
       val key = visitTablePropertyKey(property.key)
       val value = Option(property.value).map(string).orNull
       key -> value
-    }.toMap
+    }
+    // Check for duplicate property names.
+    checkDuplicateKeys(properties, ctx)
+    properties.toMap
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 708b878c843a701154c45e8ae5dfcf99e4330b3a..54f98a6232e915ba95cec9d7391afe4ba8269ae5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -665,6 +665,21 @@ class DDLCommandSuite extends PlanTest {
     assert(parsed.isInstanceOf[Project])
   }
 
+  test("duplicate keys in table properties") {
+    val e = intercept[ParseException] {
+      parser.parsePlan("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('key1' = '1', 'key1' = '2')")
+    }.getMessage
+    assert(e.contains("Found duplicate keys 'key1'"))
+  }
+
+  test("duplicate columns in partition specs") {
+    val e = intercept[ParseException] {
+      parser.parsePlan(
+        "ALTER TABLE dbx.tab1 PARTITION (a='1', a='2') RENAME TO PARTITION (a='100', a='200')")
+    }.getMessage
+    assert(e.contains("Found duplicate keys 'a'"))
+  }
+
   test("drop table") {
     val tableName1 = "db.tab"
     val tableName2 = "tab"