diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 4cd21aef9123581a29a585e254f0e713bf8bb228..0f6e3446559b534beb0cd577d78be4d9f5779bb6 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -522,14 +522,11 @@ Hive metastore. Persistent tables will still exist even after your Spark program
 long as you maintain your connection to the same metastore. A DataFrame for a persistent table can
 be created by calling the `table` method on a `SparkSession` with the name of the table.
 
-By default `saveAsTable` will create a "managed table", meaning that the location of the data will
-be controlled by the metastore. Managed tables will also have their data deleted automatically
-when a table is dropped.
-
-Currently, `saveAsTable` does not expose an API supporting the creation of an "external table" from a `DataFrame`.
-However, this functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key
-and location of the external table as its value (a string) when saving the table with `saveAsTable`. When an External table
-is dropped only its metadata is removed.
+For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the
+`path` option, e.g. `df.write.option("path", "/some/path").saveAsTable("t")`. When the table is dropped,
+the custom table path will not be removed and the table data is still there. If no custom table path is
+specifed, Spark will write data to a default table path under the warehouse directory. When the table is
+dropped, the default table path will be removed too.
 
 Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore. This brings several benefits:
 
@@ -954,6 +951,53 @@ adds support for finding tables in the MetaStore and writing queries using HiveQ
 </div>
 </div>
 
+### Specifying storage format for Hive tables
+
+When you create a Hive table, you need to define how this table should read/write data from/to file system,
+i.e. the "input format" and "output format". You also need to define how this table should deserialize the data
+to rows, or serialize rows to data, i.e. the "serde". The following options can be used to specify the storage
+format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')`.
+By default, we will read the table files as plain text. Note that, Hive storage handler is not supported yet when
+creating table, you can create a table using storage handler at Hive side, and use Spark SQL to read it.
+
+<table class="table">
+  <tr><th>Property Name</th><th>Meaning</th></tr>
+  <tr>
+    <td><code>fileFormat</code></td>
+    <td>
+      A fileFormat is kind of a package of storage format specifications, including "serde", "input format" and
+      "output format". Currently we support 6 fileFormats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'.
+    </td>
+  </tr>
+
+  <tr>
+    <td><code>inputFormat, outputFormat</code></td>
+    <td>
+      These 2 options specify the name of a corresponding `InputFormat` and `OutputFormat` class as a string literal,
+      e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options must be appeared in pair, and you can not
+      specify them if you already specified the `fileFormat` option.
+    </td>
+  </tr>
+
+  <tr>
+    <td><code>serde</code></td>
+    <td>
+      This option specifies the name of a serde class. When the `fileFormat` option is specified, do not specify this option
+      if the given `fileFormat` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile"
+      don't include the serde information and you can use this option with these 3 fileFormats.
+    </td>
+  </tr>
+
+  <tr>
+    <td><code>fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim</code></td>
+    <td>
+      These options can only be used with "textfile" fileFormat. They define how to read delimited files into rows.
+    </td>
+  </tr>
+</table>
+
+All other properties defined with `OPTIONS` will be regarded as Hive serde properties.
+
 ### Interacting with Different Versions of Hive Metastore
 
 One of the most important pieces of Spark SQL's Hive support is interaction with Hive metastore,
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
index 052153c9e9736fb9b0b3d177a01c782b9c1c1394..8d06d38cf2c6b1ab8f20991a8f173f0a5574c086 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
@@ -64,7 +64,7 @@ public class JavaSparkHiveExample {
       .enableHiveSupport()
       .getOrCreate();
 
-    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
+    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
     spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
 
     // Queries are expressed in HiveQL
diff --git a/examples/src/main/python/sql/hive.py b/examples/src/main/python/sql/hive.py
index ad83fe1cf14b5e7d4ee2ff3567e82194af1f1939..ba01544a5bd2e7cb75a755588355029cf785ccbd 100644
--- a/examples/src/main/python/sql/hive.py
+++ b/examples/src/main/python/sql/hive.py
@@ -44,7 +44,7 @@ if __name__ == "__main__":
         .getOrCreate()
 
     # spark is an existing SparkSession
-    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
     spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
 
     # Queries are expressed in HiveQL
diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R
index 373a36dba14f0c52b566f860ce421eb611b00214..e647f0e1e9f174e23d5f28812a33311c2204cf69 100644
--- a/examples/src/main/r/RSparkSQLExample.R
+++ b/examples/src/main/r/RSparkSQLExample.R
@@ -195,7 +195,7 @@ head(teenagers)
 # $example on:spark_hive$
 # enableHiveSupport defaults to TRUE
 sparkR.session(enableHiveSupport = TRUE)
-sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
 sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
 
 # Queries can be expressed in HiveQL.
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
index ded18dacf1fe3df4871d2357c50357857beb1175..d29ed958fe8f4f527fd7b7e852e9672c37b5357b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala
@@ -50,7 +50,7 @@ object SparkHiveExample {
     import spark.implicits._
     import spark.sql
 
-    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
+    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
     sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
 
     // Queries are expressed in HiveQL
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index a34087cb6cd4a2a6b25b417ef0498e642a2a92cf..3222a9cdc2c4ed42d6e4b967a38ca7d15612de3c 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -69,16 +69,18 @@ statement
     | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList     #setDatabaseProperties
     | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)?      #dropDatabase
     | createTableHeader ('(' colTypeList ')')? tableProvider
-        (OPTIONS tablePropertyList)?
+        (OPTIONS options=tablePropertyList)?
         (PARTITIONED BY partitionColumnNames=identifierList)?
-        bucketSpec? (AS? query)?                                       #createTableUsing
+        bucketSpec? locationSpec?
+        (COMMENT comment=STRING)?
+        (AS? query)?                                                   #createTable
     | createTableHeader ('(' columns=colTypeList ')')?
-        (COMMENT STRING)?
+        (COMMENT comment=STRING)?
         (PARTITIONED BY '(' partitionColumns=colTypeList ')')?
         bucketSpec? skewSpec?
         rowFormat?  createFileFormat? locationSpec?
         (TBLPROPERTIES tablePropertyList)?
-        (AS? query)?                                                   #createTable
+        (AS? query)?                                                   #createHiveTable
     | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
         LIKE source=tableIdentifier                                    #createTableLike
     | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
index a7f7a8a66382ac22684a69cac6ad16a23bf97ed3..29e49a58375b9adf153ddf2a5cf468a7e54b4517 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
@@ -27,6 +27,8 @@ class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
 
   override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
 
+  override def contains(k: String): Boolean = baseMap.contains(k.toLowerCase)
+
   override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
     baseMap + kv.copy(_1 = kv._1.toLowerCase)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 14a983e43b005ebd5c7615351142b66b9d807fc5..41768d451261a97ac88e1e1455be5653f52c046f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -342,11 +342,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a data source table, returning a [[CreateTable]] logical plan.
+   * Create a table, returning a [[CreateTable]] logical plan.
    *
    * Expected format:
    * {{{
-   *   CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
+   *   CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
    *   USING table_provider
    *   [OPTIONS table_property_list]
    *   [PARTITIONED BY (col_name, col_name, ...)]
@@ -354,19 +354,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    *    [SORTED BY (col_name [ASC|DESC], ...)]
    *    INTO num_buckets BUCKETS
    *   ]
+   *   [LOCATION path]
+   *   [COMMENT table_comment]
    *   [AS select_statement];
    * }}}
    */
-  override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) {
+  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
     val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
     if (external) {
       operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
     }
-    val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
+    val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val provider = ctx.tableProvider.qualifiedName.getText
-    if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) {
-      throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING")
-    }
     val schema = Option(ctx.colTypeList()).map(createSchema)
     val partitionColumnNames =
       Option(ctx.partitionColumnNames)
@@ -374,10 +373,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         .getOrElse(Array.empty[String])
     val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
 
-    // TODO: this may be wrong for non file-based data source like JDBC, which should be external
-    // even there is no `path` in options. We should consider allow the EXTERNAL keyword.
+    val location = Option(ctx.locationSpec).map(visitLocationSpec)
     val storage = DataSource.buildStorageFormatFromOptions(options)
-    val tableType = if (storage.locationUri.isDefined) {
+
+    if (location.isDefined && storage.locationUri.isDefined) {
+      throw new ParseException(
+        "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
+          "you can only specify one of them.", ctx)
+    }
+    val customLocation = storage.locationUri.orElse(location)
+
+    val tableType = if (customLocation.isDefined) {
       CatalogTableType.EXTERNAL
     } else {
       CatalogTableType.MANAGED
@@ -386,12 +392,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     val tableDesc = CatalogTable(
       identifier = table,
       tableType = tableType,
-      storage = storage,
+      storage = storage.copy(locationUri = customLocation),
       schema = schema.getOrElse(new StructType),
       provider = Some(provider),
       partitionColumnNames = partitionColumnNames,
-      bucketSpec = bucketSpec
-    )
+      bucketSpec = bucketSpec,
+      comment = Option(ctx.comment).map(string))
 
     // Determine the storage mode.
     val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
@@ -1011,10 +1017,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a table, returning a [[CreateTable]] logical plan.
+   * Create a Hive serde table, returning a [[CreateTable]] logical plan.
    *
-   * This is not used to create datasource tables, which is handled through
-   * "CREATE TABLE ... USING ...".
+   * This is a legacy syntax for Hive compatibility, we recommend users to use the Spark SQL
+   * CREATE TABLE syntax to create Hive serde table, e.g. "CREATE TABLE ... USING hive ..."
    *
    * Note: several features are currently not supported - temporary tables, bucketing,
    * skewed columns and storage handlers (STORED BY).
@@ -1032,7 +1038,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    *   [AS select_statement];
    * }}}
    */
-  override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
+  override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) {
     val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
     // TODO: implement temporary tables
     if (temp) {
@@ -1046,7 +1052,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     if (ctx.bucketSpec != null) {
       operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
     }
-    val comment = Option(ctx.STRING).map(string)
     val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
     val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
     val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
@@ -1057,19 +1062,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     val schema = StructType(dataCols ++ partitionCols)
 
     // Storage format
-    val defaultStorage: CatalogStorageFormat = {
-      val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
-      val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
-      CatalogStorageFormat(
-        locationUri = None,
-        inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
-          .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
-        outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
-          .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-        serde = defaultHiveSerde.flatMap(_.serde),
-        compressed = false,
-        properties = Map())
-    }
+    val defaultStorage = HiveSerDe.getDefaultStorage(conf)
     validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
     val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
       .getOrElse(CatalogStorageFormat.empty)
@@ -1104,7 +1097,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       provider = Some(DDLUtils.HIVE_PROVIDER),
       partitionColumnNames = partitionCols.map(_.name),
       properties = properties,
-      comment = comment)
+      comment = Option(ctx.comment).map(string))
 
     val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 28808f8e3ee145f9b47f16b6b4a7f71c01bc0eb5..1257d1728c3115fa3ad2c0251c4e2d7337939c35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -408,8 +408,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTable(tableDesc, mode, None)
-        if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
+      case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
         val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
 
@@ -421,8 +420,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
       // `CreateTables`
 
-      case CreateTable(tableDesc, mode, Some(query))
-        if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER =>
+      case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) =>
         val cmd =
           CreateDataSourceTableAsSelectCommand(
             tableDesc,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 59a29e884739a89a02a5a209d3c96c56f81250e2..82cbb4aa474453bcf1513ad935a3318bf0a25eaa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -762,8 +762,12 @@ case class AlterTableSetLocationCommand(
 object DDLUtils {
   val HIVE_PROVIDER = "hive"
 
+  def isHiveTable(table: CatalogTable): Boolean = {
+    table.provider.isDefined && table.provider.get.toLowerCase == HIVE_PROVIDER
+  }
+
   def isDatasourceTable(table: CatalogTable): Boolean = {
-    table.provider.isDefined && table.provider.get != HIVE_PROVIDER
+    table.provider.isDefined && table.provider.get.toLowerCase != HIVE_PROVIDER
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 07b16671f744b4d8ab2f80661eb1b360a04774a5..94ba814fa51ff7becf6049acfdd29b6ce30cf3e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -109,7 +109,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
         throw new AnalysisException("Saving data into a view is not allowed.")
       }
 
-      if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
+      if (DDLUtils.isHiveTable(existingTable)) {
         throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
           "not supported yet. Please use the insertInto() API as an alternative.")
       }
@@ -233,7 +233,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
     checkDuplication(normalizedPartitionCols, "partition")
 
     if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
-      if (table.provider.get == DDLUtils.HIVE_PROVIDER) {
+      if (DDLUtils.isHiveTable(table)) {
         // When we hit this branch, it means users didn't specify schema for the table to be
         // created, as we always include partition columns in table schema for hive serde tables.
         // The real schema will be inferred at hive metastore by hive serde, plus the given
@@ -380,8 +380,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
 object HiveOnlyCheck extends (LogicalPlan => Unit) {
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
-      case CreateTable(tableDesc, _, Some(_))
-          if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
+      case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) =>
         throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")
 
       case _ => // OK
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
index 52e648a917d8bd8d7eae85878e2d3c78e753c095..ca46a1151e3e1d534cd00e7650312735faaeba72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -17,12 +17,49 @@
 
 package org.apache.spark.sql.internal
 
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+
 case class HiveSerDe(
   inputFormat: Option[String] = None,
   outputFormat: Option[String] = None,
   serde: Option[String] = None)
 
 object HiveSerDe {
+  val serdeMap = Map(
+    "sequencefile" ->
+      HiveSerDe(
+        inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+        outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
+
+    "rcfile" ->
+      HiveSerDe(
+        inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
+        outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
+        serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")),
+
+    "orc" ->
+      HiveSerDe(
+        inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
+        outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
+        serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
+
+    "parquet" ->
+      HiveSerDe(
+        inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+        outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
+        serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
+
+    "textfile" ->
+      HiveSerDe(
+        inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+        outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
+
+    "avro" ->
+      HiveSerDe(
+        inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
+        outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
+        serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
+
   /**
    * Get the Hive SerDe information from the data source abbreviation string or classname.
    *
@@ -31,41 +68,6 @@ object HiveSerDe {
    * @return HiveSerDe associated with the specified source
    */
   def sourceToSerDe(source: String): Option[HiveSerDe] = {
-    val serdeMap = Map(
-      "sequencefile" ->
-        HiveSerDe(
-          inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
-          outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
-
-      "rcfile" ->
-        HiveSerDe(
-          inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
-          outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
-          serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")),
-
-      "orc" ->
-        HiveSerDe(
-          inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
-          outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
-          serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
-
-      "parquet" ->
-        HiveSerDe(
-          inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
-          outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
-          serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
-
-      "textfile" ->
-        HiveSerDe(
-          inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
-          outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-
-      "avro" ->
-        HiveSerDe(
-          inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
-          outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
-          serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
-
     val key = source.toLowerCase match {
       case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
       case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
@@ -77,4 +79,16 @@ object HiveSerDe {
 
     serdeMap.get(key)
   }
+
+  def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = {
+    val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
+    val defaultHiveSerde = sourceToSerDe(defaultStorageType)
+    CatalogStorageFormat.empty.copy(
+      inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
+        .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
+      outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
+        .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
+      serde = defaultHiveSerde.flatMap(_.serde)
+        .orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index b070138be05d53e95d2962dd6e13a4f7f8a9ef9b..15e490fb30a277eb3f76833a061c0c62c911a8f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -121,7 +121,8 @@ class SparkSqlParserSuite extends PlanTest {
       tableType: CatalogTableType = CatalogTableType.MANAGED,
       storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy(
         inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat,
-        outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat),
+        outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat,
+        serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")),
       schema: StructType = new StructType,
       provider: Option[String] = Some("hive"),
       partitionColumnNames: Seq[String] = Seq.empty,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 1a5e5226c2ec9fe8a66fcf1404943f860eb5659d..76bb9e5929a7138c2a96963941adee6efc7f22fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -236,7 +236,7 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed4, expected4)
   }
 
-  test("create table - table file format") {
+  test("create hive table - table file format") {
     val allSources = Seq("parquet", "parquetfile", "orc", "orcfile", "avro", "avrofile",
       "sequencefile", "rcfile", "textfile")
 
@@ -245,13 +245,14 @@ class DDLCommandSuite extends PlanTest {
       val ct = parseAs[CreateTable](query)
       val hiveSerde = HiveSerDe.sourceToSerDe(s)
       assert(hiveSerde.isDefined)
-      assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
+      assert(ct.tableDesc.storage.serde ==
+        hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
       assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
       assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
     }
   }
 
-  test("create table - row format and table file format") {
+  test("create hive table - row format and table file format") {
     val createTableStart = "CREATE TABLE my_tab ROW FORMAT"
     val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'"
     val query1 = s"$createTableStart SERDE 'anything' $fileFormat"
@@ -262,13 +263,15 @@ class DDLCommandSuite extends PlanTest {
     assert(parsed1.tableDesc.storage.serde == Some("anything"))
     assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt"))
     assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt"))
+
     val parsed2 = parseAs[CreateTable](query2)
-    assert(parsed2.tableDesc.storage.serde.isEmpty)
+    assert(parsed2.tableDesc.storage.serde ==
+      Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
     assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt"))
     assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt"))
   }
 
-  test("create table - row format serde and generic file format") {
+  test("create hive table - row format serde and generic file format") {
     val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
     val supportedSources = Set("sequencefile", "rcfile", "textfile")
 
@@ -287,7 +290,7 @@ class DDLCommandSuite extends PlanTest {
     }
   }
 
-  test("create table - row format delimited and generic file format") {
+  test("create hive table - row format delimited and generic file format") {
     val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
     val supportedSources = Set("textfile")
 
@@ -297,7 +300,8 @@ class DDLCommandSuite extends PlanTest {
         val ct = parseAs[CreateTable](query)
         val hiveSerde = HiveSerDe.sourceToSerDe(s)
         assert(hiveSerde.isDefined)
-        assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
+        assert(ct.tableDesc.storage.serde ==
+          hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
         assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
         assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
       } else {
@@ -306,7 +310,7 @@ class DDLCommandSuite extends PlanTest {
     }
   }
 
-  test("create external table - location must be specified") {
+  test("create hive external table - location must be specified") {
     assertUnsupported(
       sql = "CREATE EXTERNAL TABLE my_tab",
       containsThesePhrases = Seq("create external table", "location"))
@@ -316,7 +320,7 @@ class DDLCommandSuite extends PlanTest {
     assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
   }
 
-  test("create table - property values must be set") {
+  test("create hive table - property values must be set") {
     assertUnsupported(
       sql = "CREATE TABLE my_tab TBLPROPERTIES('key_without_value', 'key_with_value'='x')",
       containsThesePhrases = Seq("key_without_value"))
@@ -326,14 +330,14 @@ class DDLCommandSuite extends PlanTest {
       containsThesePhrases = Seq("key_without_value"))
   }
 
-  test("create table - location implies external") {
+  test("create hive table - location implies external") {
     val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
     val ct = parseAs[CreateTable](query)
     assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
     assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
   }
 
-  test("create table using - with partitioned by") {
+  test("create table - with partitioned by") {
     val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
       "USING parquet PARTITIONED BY (a)"
 
@@ -357,7 +361,7 @@ class DDLCommandSuite extends PlanTest {
     }
   }
 
-  test("create table using - with bucket") {
+  test("create table - with bucket") {
     val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
       "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
 
@@ -379,6 +383,57 @@ class DDLCommandSuite extends PlanTest {
     }
   }
 
+  test("create table - with comment") {
+    val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType().add("a", IntegerType).add("b", StringType),
+      provider = Some("parquet"),
+      comment = Some("abc"))
+
+    parser.parsePlan(sql) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
+          s"got ${other.getClass.getName}: $sql")
+    }
+  }
+
+  test("create table - with location") {
+    val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'"
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.EXTERNAL,
+      storage = CatalogStorageFormat.empty.copy(locationUri = Some("/tmp/file")),
+      schema = new StructType().add("a", IntegerType).add("b", StringType),
+      provider = Some("parquet"))
+
+    parser.parsePlan(v1) match {
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
+      case other =>
+        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
+          s"got ${other.getClass.getName}: $v1")
+    }
+
+    val v2 =
+      """
+        |CREATE TABLE my_tab(a INT, b STRING)
+        |USING parquet
+        |OPTIONS (path '/tmp/file')
+        |LOCATION '/tmp/file'
+      """.stripMargin
+    val e = intercept[ParseException] {
+      parser.parsePlan(v2)
+    }
+    assert(e.message.contains("you can only specify one of them."))
+  }
+
   // ALTER TABLE table_name RENAME TO new_table_name;
   // ALTER VIEW view_name RENAME TO new_view_name;
   test("alter table/view: rename table/view") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index fde6d4a94762d7a6b7e2bc4c2bacba419843a79d..474a2c868e20af6bb89bedb2a6d3d7758236c73c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -215,7 +215,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
         tableDefinition.storage.locationUri
       }
 
-      if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
+      if (DDLUtils.isHiveTable(tableDefinition)) {
         val tableWithDataSourceProps = tableDefinition.copy(
           // We can't leave `locationUri` empty and count on Hive metastore to set a default table
           // location, because Hive metastore uses hive.metastore.warehouse.dir to generate default
@@ -533,7 +533,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     } else {
       val oldTableDef = getRawTable(db, withStatsProps.identifier.table)
 
-      val newStorage = if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
+      val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) {
         tableDefinition.storage
       } else {
         // We can't alter the table storage of data source table directly for 2 reasons:
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index bea073fb4848aaf93275f7e17e6e8bdc30e90422..52892f1c7270c42a69ecae180e0f733493b2fcb0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -64,6 +64,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
         AnalyzeCreateTable(sparkSession) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::
+        new DetermineHiveSerde(conf) ::
         (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
 
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 773c4a39d854c859443fdfc710e659b9b74d0e03..6d5cc5778aef8d8ed3715418c0d46a9519a915d4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,14 +18,73 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
 import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.execution._
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+
+
+/**
+ * Determine the serde/format of the Hive serde table, according to the storage properties.
+ */
+class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) && t.storage.serde.isEmpty =>
+      if (t.bucketSpec.isDefined) {
+        throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
+      }
+      if (t.partitionColumnNames.nonEmpty && query.isDefined) {
+        val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
+          "create a partitioned table using Hive's file formats. " +
+          "Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
+          "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
+          "CTAS statement."
+        throw new AnalysisException(errorMessage)
+      }
+
+      val defaultStorage = HiveSerDe.getDefaultStorage(conf)
+      val options = new HiveOptions(t.storage.properties)
+
+      val fileStorage = if (options.fileFormat.isDefined) {
+        HiveSerDe.sourceToSerDe(options.fileFormat.get) match {
+          case Some(s) =>
+            CatalogStorageFormat.empty.copy(
+              inputFormat = s.inputFormat,
+              outputFormat = s.outputFormat,
+              serde = s.serde)
+          case None =>
+            throw new IllegalArgumentException(s"invalid fileFormat: '${options.fileFormat.get}'")
+        }
+      } else if (options.hasInputOutputFormat) {
+        CatalogStorageFormat.empty.copy(
+          inputFormat = options.inputFormat,
+          outputFormat = options.outputFormat)
+      } else {
+        CatalogStorageFormat.empty
+      }
+
+      val rowStorage = if (options.serde.isDefined) {
+        CatalogStorageFormat.empty.copy(serde = options.serde)
+      } else {
+        CatalogStorageFormat.empty
+      }
+
+      val storage = t.storage.copy(
+        inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
+        outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
+        serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
+        properties = options.serdeProperties)
+
+      c.copy(tableDesc = t.copy(storage = storage))
+  }
+}
 
 private[hive] trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
@@ -49,15 +108,7 @@ private[hive] trait HiveStrategies {
         InsertIntoHiveTable(
           table, partition, planLater(child), overwrite, ifNotExists) :: Nil
 
-      case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
-        val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
-          // add default serde
-          tableDesc.withNewStorage(
-            serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
-        } else {
-          tableDesc
-        }
-
+      case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
         // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
         // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
         // tables yet.
@@ -68,7 +119,7 @@ private[hive] trait HiveStrategies {
 
         val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
         val cmd = CreateHiveTableAsSelectCommand(
-          newTableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
+          tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
           query,
           mode == SaveMode.Ignore)
         ExecutedCommandExec(cmd) :: Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
new file mode 100644
index 0000000000000000000000000000000000000000..35b7a681f12e07841049ecc37f81f0b312e6b57d
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
+/**
+ * Options for the Hive data source. Note that rule `DetermineHiveSerde` will extract Hive
+ * serde/format information from these options.
+ */
+class HiveOptions(@transient private val parameters: CaseInsensitiveMap) extends Serializable {
+  import HiveOptions._
+
+  def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
+
+  val fileFormat = parameters.get(FILE_FORMAT).map(_.toLowerCase)
+  val inputFormat = parameters.get(INPUT_FORMAT)
+  val outputFormat = parameters.get(OUTPUT_FORMAT)
+
+  if (inputFormat.isDefined != outputFormat.isDefined) {
+    throw new IllegalArgumentException("Cannot specify only inputFormat or outputFormat, you " +
+      "have to specify both of them.")
+  }
+
+  def hasInputOutputFormat: Boolean = inputFormat.isDefined
+
+  if (fileFormat.isDefined && inputFormat.isDefined) {
+    throw new IllegalArgumentException("Cannot specify fileFormat and inputFormat/outputFormat " +
+      "together for Hive data source.")
+  }
+
+  val serde = parameters.get(SERDE)
+
+  if (fileFormat.isDefined && serde.isDefined) {
+    if (!Set("sequencefile", "textfile", "rcfile").contains(fileFormat.get)) {
+      throw new IllegalArgumentException(
+        s"fileFormat '${fileFormat.get}' already specifies a serde.")
+    }
+  }
+
+  val containsDelimiters = delimiterOptions.keys.exists(parameters.contains)
+
+  if (containsDelimiters) {
+    if (serde.isDefined) {
+      throw new IllegalArgumentException("Cannot specify delimiters with a custom serde.")
+    }
+    if (fileFormat.isEmpty) {
+      throw new IllegalArgumentException("Cannot specify delimiters without fileFormat.")
+    }
+    if (fileFormat.get != "textfile") {
+      throw new IllegalArgumentException("Cannot specify delimiters as they are only compatible " +
+        s"with fileFormat 'textfile', not ${fileFormat.get}.")
+    }
+  }
+
+  for (lineDelim <- parameters.get("lineDelim") if lineDelim != "\n") {
+    throw new IllegalArgumentException("Hive data source only support newline '\\n' as " +
+      s"line delimiter, but given: $lineDelim.")
+  }
+
+  def serdeProperties: Map[String, String] = parameters.filterKeys {
+    k => !lowerCasedOptionNames.contains(k.toLowerCase)
+  }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v }
+}
+
+object HiveOptions {
+  private val lowerCasedOptionNames = collection.mutable.Set[String]()
+
+  private def newOption(name: String): String = {
+    lowerCasedOptionNames += name.toLowerCase
+    name
+  }
+
+  val FILE_FORMAT = newOption("fileFormat")
+  val INPUT_FORMAT = newOption("inputFormat")
+  val OUTPUT_FORMAT = newOption("outputFormat")
+  val SERDE = newOption("serde")
+
+  // A map from the public delimiter option keys to the underlying Hive serde property keys.
+  val delimiterOptions = Map(
+    "fieldDelim" -> "field.delim",
+    "escapeDelim" -> "escape.delim",
+    // The following typo is inherited from Hive...
+    "collectionDelim" -> "colelction.delim",
+    "mapkeyDelim" -> "mapkey.delim",
+    "lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase -> v }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 3f1f86c278db008a54014d740c4754e6b6d71928..5a3fcd7a759c0e272a406626980499b620797ec4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.types.StructType
 
-private[orc] object OrcFileOperator extends Logging {
+private[hive] object OrcFileOperator extends Logging {
   /**
    * Retrieves an ORC file reader from a given path.  The path can point to either a directory or a
    * single ORC file.  If it points to a directory, it picks any non-empty ORC file within that
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index d13e29b3029b138872fe798dfe81b53f22b3284d..b67e5f6fe57a125ceca309427d746b7a93fe7684 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformati
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types.StructType
 
@@ -51,6 +50,12 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
     assert(e.getMessage.toLowerCase.contains("operation not allowed"))
   }
 
+  private def analyzeCreateTable(sql: String): CatalogTable = {
+    TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect {
+      case CreateTable(tableDesc, mode, _) => tableDesc
+    }.head
+  }
+
   test("Test CTAS #1") {
     val s1 =
       """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
@@ -76,7 +81,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
     assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
     assert(desc.storage.serde ==
       Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
-    assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
+    assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2"))
   }
 
   test("Test CTAS #2") {
@@ -107,7 +112,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
     assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
     assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
     assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe"))
-    assert(desc.properties == Map(("p1", "v1"), ("p2", "v2")))
+    assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2"))
   }
 
   test("Test CTAS #3") {
@@ -125,7 +130,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
     assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat"))
     assert(desc.storage.outputFormat ==
       Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
-    assert(desc.storage.serde.isEmpty)
+    assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
     assert(desc.properties == Map())
   }
 
@@ -305,7 +310,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
       Some("org.apache.hadoop.mapred.TextInputFormat"))
     assert(desc.storage.outputFormat ==
       Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
-    assert(desc.storage.serde.isEmpty)
+    assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
     assert(desc.storage.properties.isEmpty)
     assert(desc.properties.isEmpty)
     assert(desc.comment.isEmpty)
@@ -412,7 +417,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
     val (desc2, _) = extractTableDesc(query2)
     assert(desc1.storage.inputFormat == Some("winput"))
     assert(desc1.storage.outputFormat == Some("wowput"))
-    assert(desc1.storage.serde.isEmpty)
+    assert(desc1.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
     assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
     assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
     assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
@@ -592,4 +597,94 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
     val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
     assert(hiveClient.getConf("hive.in.test", "") == "true")
   }
+
+  test("create hive serde table with new syntax - basic") {
+    val sql =
+      """
+        |CREATE TABLE t
+        |(id int, name string COMMENT 'blabla')
+        |USING hive
+        |OPTIONS (fileFormat 'parquet', my_prop 1)
+        |LOCATION '/tmp/file'
+        |COMMENT 'BLABLA'
+      """.stripMargin
+
+    val table = analyzeCreateTable(sql)
+    assert(table.schema == new StructType()
+      .add("id", "int")
+      .add("name", "string", nullable = true, comment = "blabla"))
+    assert(table.provider == Some(DDLUtils.HIVE_PROVIDER))
+    assert(table.storage.locationUri == Some("/tmp/file"))
+    assert(table.storage.properties == Map("my_prop" -> "1"))
+    assert(table.comment == Some("BLABLA"))
+
+    assert(table.storage.inputFormat ==
+      Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
+    assert(table.storage.outputFormat ==
+      Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+    assert(table.storage.serde ==
+      Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+  }
+
+  test("create hive serde table with new syntax - with partition and bucketing") {
+    val v1 = "CREATE TABLE t (c1 int, c2 int) USING hive PARTITIONED BY (c2)"
+    val table = analyzeCreateTable(v1)
+    assert(table.schema == new StructType().add("c1", "int").add("c2", "int"))
+    assert(table.partitionColumnNames == Seq("c2"))
+    // check the default formats
+    assert(table.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+    assert(table.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat"))
+    assert(table.storage.outputFormat ==
+      Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
+
+    val v2 = "CREATE TABLE t (c1 int, c2 int) USING hive CLUSTERED BY (c2) INTO 4 BUCKETS"
+    val e2 = intercept[AnalysisException](analyzeCreateTable(v2))
+    assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))
+
+    val v3 =
+      """
+        |CREATE TABLE t (c1 int, c2 int) USING hive
+        |PARTITIONED BY (c2)
+        |CLUSTERED BY (c2) INTO 4 BUCKETS""".stripMargin
+    val e3 = intercept[AnalysisException](analyzeCreateTable(v3))
+    assert(e3.message.contains("Creating bucketed Hive serde table is not supported yet"))
+  }
+
+  test("create hive serde table with new syntax - Hive options error checking") {
+    val v1 = "CREATE TABLE t (c1 int) USING hive OPTIONS (inputFormat 'abc')"
+    val e1 = intercept[IllegalArgumentException](analyzeCreateTable(v1))
+    assert(e1.getMessage.contains("Cannot specify only inputFormat or outputFormat"))
+
+    val v2 = "CREATE TABLE t (c1 int) USING hive OPTIONS " +
+      "(fileFormat 'x', inputFormat 'a', outputFormat 'b')"
+    val e2 = intercept[IllegalArgumentException](analyzeCreateTable(v2))
+    assert(e2.getMessage.contains(
+      "Cannot specify fileFormat and inputFormat/outputFormat together"))
+
+    val v3 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', serde 'a')"
+    val e3 = intercept[IllegalArgumentException](analyzeCreateTable(v3))
+    assert(e3.getMessage.contains("fileFormat 'parquet' already specifies a serde"))
+
+    val v4 = "CREATE TABLE t (c1 int) USING hive OPTIONS (serde 'a', fieldDelim ' ')"
+    val e4 = intercept[IllegalArgumentException](analyzeCreateTable(v4))
+    assert(e4.getMessage.contains("Cannot specify delimiters with a custom serde"))
+
+    val v5 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fieldDelim ' ')"
+    val e5 = intercept[IllegalArgumentException](analyzeCreateTable(v5))
+    assert(e5.getMessage.contains("Cannot specify delimiters without fileFormat"))
+
+    val v6 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', fieldDelim ' ')"
+    val e6 = intercept[IllegalArgumentException](analyzeCreateTable(v6))
+    assert(e6.getMessage.contains(
+      "Cannot specify delimiters as they are only compatible with fileFormat 'textfile'"))
+
+    // The value of 'fileFormat' option is case-insensitive.
+    val v7 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'TEXTFILE', lineDelim ',')"
+    val e7 = intercept[IllegalArgumentException](analyzeCreateTable(v7))
+    assert(e7.getMessage.contains("Hive data source only support newline '\\n' as line delimiter"))
+
+    val v8 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'wrong')"
+    val e8 = intercept[IllegalArgumentException](analyzeCreateTable(v8))
+    assert(e8.getMessage.contains("invalid fileFormat: 'wrong'"))
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 6fee45824ea3fe0784944fa735bba79b83c572b6..2f02bb5d3b6492e8f58e6045b53ff649ccc7ed87 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -68,6 +68,6 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
 
     val rawTable = externalCatalog.client.getTable("db1", "hive_tbl")
     assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
-    assert(externalCatalog.getTable("db1", "hive_tbl").provider == Some(DDLUtils.HIVE_PROVIDER))
+    assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 2b8d4e2bb3bb5f25111d42637c2c9c4f12b5e84d..aed825e2f3d26577898a0584c008cbc0ffbc3a6d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1189,21 +1189,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     }
   }
 
-  test("create a data source table using hive") {
-    val tableName = "tab1"
-    withTable (tableName) {
-      val e = intercept[AnalysisException] {
-        sql(
-          s"""
-             |CREATE TABLE $tableName
-             |(col1 int)
-             |USING hive
-           """.stripMargin)
-      }.getMessage
-      assert(e.contains("Cannot create hive serde table with CREATE TABLE USING"))
-    }
-  }
-
   test("create a temp view using hive") {
     val tableName = "tab1"
     withTable (tableName) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 8b3421953025919c32d7fa2099b63ceeed194d16..3ac07d093381c8ea1ab4004fb0bd3179aae1fb86 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, Cat
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.orc.OrcFileOperator
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
@@ -1250,4 +1251,42 @@ class HiveDDLSuite
       assert(e.message.contains("unknown is not a valid partition column"))
     }
   }
+
+  test("create hive serde table with new syntax") {
+    withTable("t", "t2", "t3") {
+      withTempPath { path =>
+        sql(
+          s"""
+            |CREATE TABLE t(id int) USING hive
+            |OPTIONS(fileFormat 'orc', compression 'Zlib')
+            |LOCATION '${path.getCanonicalPath}'
+          """.stripMargin)
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        assert(DDLUtils.isHiveTable(table))
+        assert(table.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+        assert(table.storage.properties.get("compression") == Some("Zlib"))
+        assert(spark.table("t").collect().isEmpty)
+
+        sql("INSERT INTO t SELECT 1")
+        checkAnswer(spark.table("t"), Row(1))
+        // Check if this is compressed as ZLIB.
+        val maybeOrcFile = path.listFiles().find(_.getName.endsWith("part-00000"))
+        assert(maybeOrcFile.isDefined)
+        val orcFilePath = maybeOrcFile.get.toPath.toString
+        val expectedCompressionKind =
+          OrcFileOperator.getFileReader(orcFilePath).get.getCompression
+        assert("ZLIB" === expectedCompressionKind.name())
+
+        sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2")
+        val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
+        assert(DDLUtils.isHiveTable(table2))
+        assert(table2.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+        checkAnswer(spark.table("t2"), Row(1, "a"))
+
+        sql("CREATE TABLE t3(a int, p int) USING hive PARTITIONED BY (p)")
+        sql("INSERT INTO t3 PARTITION(p=1) SELECT 0")
+        checkAnswer(spark.table("t3"), Row(0, 1))
+      }
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 463c368fc42b183c245f5964a9c1f41a626152b7..e678cf6f22c207f2ff26e17213412521f815fa33 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -93,8 +93,6 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
         .orc(path)
 
       // Check if this is compressed as ZLIB.
-      val conf = spark.sessionState.newHadoopConf()
-      val fs = FileSystem.getLocal(conf)
       val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
       assert(maybeOrcFile.isDefined)
       val orcFilePath = maybeOrcFile.get.toPath.toString