diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 848c59e3b8a531206187181e02b0fbfad2f6b6e9..8ea8f766295ec4adbaa9ee3c464346e0ef739ac1 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -267,8 +267,8 @@ createFileFormat
     ;
 
 fileFormat
-    : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)?         #tableFileFormat
-    | identifier                                                                           #genericFileFormat
+    : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING    #tableFileFormat
+    | identifier                                             #genericFileFormat
     ;
 
 storageHandler
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index c517b8b55fadb7ecd5e9c4263e9c9d947ffcf293..6e4af9500c3c4cd712dc1874d6f6376a39cb2037 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -796,14 +796,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    *
    * Expected format:
    * {{{
-   *   CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
-   *   [(col1 data_type [COMMENT col_comment], ...)]
+   *   CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
+   *   [(col1[:] data_type [COMMENT col_comment], ...)]
    *   [COMMENT table_comment]
-   *   [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
-   *   [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
-   *   [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]]
+   *   [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
    *   [ROW FORMAT row_format]
-   *   [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
+   *   [STORED AS file_format]
    *   [LOCATION path]
    *   [TBLPROPERTIES (property_name=property_value, ...)]
    *   [AS select_statement];
@@ -849,6 +847,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         compressed = false,
         serdeProperties = Map())
     }
+    validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
     val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
       .getOrElse(EmptyStorageFormat)
     val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
@@ -905,6 +904,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
   /**
    * Create a [[CatalogStorageFormat]] for creating tables.
+   *
+   * Format: STORED AS ...
    */
   override def visitCreateFileFormat(
       ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
@@ -932,9 +933,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
     EmptyStorageFormat.copy(
       inputFormat = Option(string(ctx.inFmt)),
-      outputFormat = Option(string(ctx.outFmt)),
-      serde = Option(ctx.serdeCls).map(string)
-    )
+      outputFormat = Option(string(ctx.outFmt)))
   }
 
   /**
@@ -1018,6 +1017,49 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     EmptyStorageFormat.copy(serdeProperties = entries.toMap)
   }
 
+  /**
+   * Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT
+   * and STORED AS.
+   *
+   * The following are allowed. Anything else is not:
+   *   ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE]
+   *   ROW FORMAT DELIMITED ... STORED AS TEXTFILE
+   *   ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ...
+   */
+  private def validateRowFormatFileFormat(
+      rowFormatCtx: RowFormatContext,
+      createFileFormatCtx: CreateFileFormatContext,
+      parentCtx: ParserRuleContext): Unit = {
+    if (rowFormatCtx == null || createFileFormatCtx == null) {
+      return
+    }
+    (rowFormatCtx, createFileFormatCtx.fileFormat) match {
+      case (_, ffTable: TableFileFormatContext) => // OK
+      case (rfSerde: RowFormatSerdeContext, ffGeneric: GenericFileFormatContext) =>
+        ffGeneric.identifier.getText.toLowerCase match {
+          case ("sequencefile" | "textfile" | "rcfile") => // OK
+          case fmt =>
+            throw operationNotAllowed(
+              s"ROW FORMAT SERDE is incompatible with format '$fmt', which also specifies a serde",
+              parentCtx)
+        }
+      case (rfDelimited: RowFormatDelimitedContext, ffGeneric: GenericFileFormatContext) =>
+        ffGeneric.identifier.getText.toLowerCase match {
+          case "textfile" => // OK
+          case fmt => throw operationNotAllowed(
+            s"ROW FORMAT DELIMITED is only compatible with 'textfile', not '$fmt'", parentCtx)
+        }
+      case _ =>
+        // should never happen
+        def str(ctx: ParserRuleContext): String = {
+          (0 until ctx.getChildCount).map { i => ctx.getChild(i).getText }.mkString(" ")
+        }
+        throw operationNotAllowed(
+          s"Unexpected combination of ${str(rowFormatCtx)} and ${str(createFileFormatCtx)}",
+          parentCtx)
+    }
+  }
+
   /**
    * Create or replace a view. This creates a [[CreateViewCommand]] command.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 54f98a6232e915ba95cec9d7391afe4ba8269ae5..eab1f55712f6ff1f5ce4114823edfd63473fb1a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.command
 
+import scala.reflect.{classTag, ClassTag}
+
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource}
 import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
@@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsing}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 
+
 // TODO: merge this with DDLSuite (SPARK-14441)
 class DDLCommandSuite extends PlanTest {
   private val parser = new SparkSqlParser(new SQLConf)
@@ -40,6 +43,15 @@ class DDLCommandSuite extends PlanTest {
     containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) }
   }
 
+  private def parseAs[T: ClassTag](query: String): T = {
+    parser.parsePlan(query) match {
+      case t: T => t
+      case other =>
+        fail(s"Expected to parse ${classTag[T].runtimeClass} from query," +
+          s"got ${other.getClass.getName}: $query")
+    }
+  }
+
   test("create database") {
     val sql =
       """
@@ -225,19 +237,69 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed4, expected4)
   }
 
+  test("create table - row format and table file format") {
+    val createTableStart = "CREATE TABLE my_tab ROW FORMAT"
+    val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'"
+    val query1 = s"$createTableStart SERDE 'anything' $fileFormat"
+    val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat"
+
+    // No conflicting serdes here, OK
+    val parsed1 = parseAs[CreateTableCommand](query1)
+    assert(parsed1.table.storage.serde == Some("anything"))
+    assert(parsed1.table.storage.inputFormat == Some("inputfmt"))
+    assert(parsed1.table.storage.outputFormat == Some("outputfmt"))
+    val parsed2 = parseAs[CreateTableCommand](query2)
+    assert(parsed2.table.storage.serde.isEmpty)
+    assert(parsed2.table.storage.inputFormat == Some("inputfmt"))
+    assert(parsed2.table.storage.outputFormat == Some("outputfmt"))
+  }
+
+  test("create table - row format serde and generic file format") {
+    val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
+    val supportedSources = Set("sequencefile", "rcfile", "textfile")
+
+    allSources.foreach { s =>
+      val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
+      if (supportedSources.contains(s)) {
+        val ct = parseAs[CreateTableCommand](query)
+        val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+        assert(hiveSerde.isDefined)
+        assert(ct.table.storage.serde == Some("anything"))
+        assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
+        assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+      } else {
+        assertUnsupported(query, Seq("row format serde", "incompatible", s))
+      }
+    }
+  }
+
+  test("create table - row format delimited and generic file format") {
+    val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
+    val supportedSources = Set("textfile")
+
+    allSources.foreach { s =>
+      val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
+      if (supportedSources.contains(s)) {
+        val ct = parseAs[CreateTableCommand](query)
+        val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+        assert(hiveSerde.isDefined)
+        assert(ct.table.storage.serde == hiveSerde.get.serde)
+        assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
+        assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+      } else {
+        assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s))
+      }
+    }
+  }
+
   test("create external table - location must be specified") {
     assertUnsupported(
       sql = "CREATE EXTERNAL TABLE my_tab",
       containsThesePhrases = Seq("create external table", "location"))
     val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
-    parser.parsePlan(query) match {
-      case ct: CreateTableCommand =>
-        assert(ct.table.tableType == CatalogTableType.EXTERNAL)
-        assert(ct.table.storage.locationUri == Some("/something/anything"))
-      case other =>
-        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
-          s"got ${other.getClass.getName}: $query")
-    }
+    val ct = parseAs[CreateTableCommand](query)
+    assert(ct.table.tableType == CatalogTableType.EXTERNAL)
+    assert(ct.table.storage.locationUri == Some("/something/anything"))
   }
 
   test("create table - property values must be set") {
@@ -252,14 +314,9 @@ class DDLCommandSuite extends PlanTest {
 
   test("create table - location implies external") {
     val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
-    parser.parsePlan(query) match {
-      case ct: CreateTableCommand =>
-        assert(ct.table.tableType == CatalogTableType.EXTERNAL)
-        assert(ct.table.storage.locationUri == Some("/something/anything"))
-      case other =>
-        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
-            s"got ${other.getClass.getName}: $query")
-    }
+    val ct = parseAs[CreateTableCommand](query)
+    assert(ct.table.tableType == CatalogTableType.EXTERNAL)
+    assert(ct.table.storage.locationUri == Some("/something/anything"))
   }
 
   test("create table using - with partitioned by") {
@@ -551,8 +608,7 @@ class DDLCommandSuite extends PlanTest {
 
   test("alter table: set file format (not allowed)") {
     assertUnsupported(
-      "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
-        "OUTPUTFORMAT 'test' SERDE 'test'")
+      "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' OUTPUTFORMAT 'test'")
     assertUnsupported(
       "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
         "SET FILEFORMAT PARQUET")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 30ad392969b4e936b2d0fb7ce8c3328aa5d3eb0a..96c8fa6b70501e317332e2fa13bede9069345b3a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -61,7 +61,7 @@ class HiveDDLCommandSuite extends PlanTest {
         |country STRING COMMENT 'country of origination')
         |COMMENT 'This is the staging page view table'
         |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 'hour of the day')
-        |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE
+        |STORED AS RCFILE
         |LOCATION '/user/external/page_view'
         |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
         |AS SELECT * FROM src""".stripMargin
@@ -88,8 +88,6 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.partitionColumns ==
       CatalogColumn("dt", "string", comment = Some("date type")) ::
       CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
-    assert(desc.storage.serdeProperties ==
-      Map((serdeConstants.SERIALIZATION_FORMAT, "\u002C"), (serdeConstants.FIELD_DELIM, "\u002C")))
     assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
     assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
     assert(desc.storage.serde ==