diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 25678e938d8462826df84dcc3d138c647349cad0..50ae9667f4c8f89835949a79665ed7a5d7d35559 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -561,7 +561,6 @@ final class DataFrameWriter private[sql](df: DataFrame) {
           CreateTableUsingAsSelect(
             tableIdent,
             source,
-            temporary = false,
             partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
             getBucketSpec,
             mode,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 01409c6a77c1e014c257213cfaba31f352a8065e..8ffc55668ae901e2a959c079377bff9d70871fb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -317,17 +317,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       // Get the backing query.
       val query = plan(ctx.query)
 
+      if (temp) {
+        throw operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
+      }
+
       // Determine the storage mode.
       val mode = if (ifNotExists) {
         SaveMode.Ignore
-      } else if (temp) {
-        SaveMode.Overwrite
       } else {
         SaveMode.ErrorIfExists
       }
 
       CreateTableUsingAsSelect(
-        table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
+        table, provider, partitionColumnNames, bucketSpec, mode, options, query)
     } else {
       val struct = Option(ctx.colTypeList()).map(createStructType)
       CreateTableUsing(
@@ -960,7 +962,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
           CreateTableUsingAsSelect(
             tableIdent = tableDesc.identifier,
             provider = conf.defaultDataSourceName,
-            temporary = false,
             partitionColumns = tableDesc.partitionColumnNames.toArray,
             bucketSpec = None,
             mode = mode,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9610506e138ff19d1c7a3e02701058bcb732dbaa..b20897e2d6599578a7668ef998f33a5eabfae011 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -397,15 +397,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         throw new AnalysisException(
           "allowExisting should be set to false when creating a temporary table.")
 
-      case c: CreateTableUsingAsSelect if c.temporary && c.partitionColumns.nonEmpty =>
-        sys.error("Cannot create temporary partitioned table.")
-
-      case c: CreateTableUsingAsSelect if c.temporary =>
-        val cmd = CreateTempTableUsingAsSelectCommand(
-          c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child)
-        ExecutedCommandExec(cmd) :: Nil
-
-      case c: CreateTableUsingAsSelect if !c.temporary =>
+      case c: CreateTableUsingAsSelect =>
         val cmd =
           CreateDataSourceTableAsSelectCommand(
             c.tableIdent,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index edbccde214c6178167e8aa8315ab85df08d915f2..bf272e3c0659d3cba509753f4d25288ffc968c3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -56,7 +56,6 @@ case class CreateTableUsing(
 case class CreateTableUsingAsSelect(
     tableIdent: TableIdentifier,
     provider: String,
-    temporary: Boolean,
     partitionColumns: Array[String],
     bucketSpec: Option[BucketSpec],
     mode: SaveMode,
@@ -91,37 +90,6 @@ case class CreateTempTableUsing(
   }
 }
 
-case class CreateTempTableUsingAsSelectCommand(
-    tableIdent: TableIdentifier,
-    provider: String,
-    partitionColumns: Array[String],
-    mode: SaveMode,
-    options: Map[String, String],
-    query: LogicalPlan) extends RunnableCommand {
-
-  if (tableIdent.database.isDefined) {
-    throw new AnalysisException(
-      s"Temporary table '$tableIdent' should not have specified a database")
-  }
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val df = Dataset.ofRows(sparkSession, query)
-    val dataSource = DataSource(
-      sparkSession,
-      className = provider,
-      partitionColumns = partitionColumns,
-      bucketSpec = None,
-      options = options)
-    val result = dataSource.write(mode, df)
-    sparkSession.sessionState.catalog.createTempView(
-      tableIdent.table,
-      Dataset.ofRows(sparkSession, LogicalRelation(result)).logicalPlan,
-      overrideIfExists = true)
-
-    Seq.empty[Row]
-  }
-}
-
 case class RefreshTable(tableIdent: TableIdentifier)
   extends RunnableCommand {
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index cbddb0643b26d9ea8b72c1fc12f667861ee12099..f9a07dbdf0be03da15758b723f84abab7c248f95 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -17,16 +17,20 @@
 
 package org.apache.spark.sql.sources
 
-import java.io.{File, IOException}
+import java.io.File
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
 class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter {
+
   protected override lazy val sql = spark.sql _
   private var path: File = null
 
@@ -40,172 +44,175 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with
   override def afterAll(): Unit = {
     try {
       spark.catalog.dropTempView("jt")
+      if (path.exists()) {
+        Utils.deleteRecursively(path)
+      }
     } finally {
       super.afterAll()
     }
   }
 
-  after {
-    Utils.deleteRecursively(path)
+  before {
+    if (path.exists()) {
+      Utils.deleteRecursively(path)
+    }
   }
 
-  test("CREATE TEMPORARY TABLE AS SELECT") {
-    sql(
-      s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a, b FROM jt
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT a, b FROM jsonTable"),
-      sql("SELECT a, b FROM jt").collect())
-
-    spark.catalog.dropTempView("jsonTable")
+  test("CREATE TABLE USING AS SELECT") {
+    withTable("jsonTable") {
+      sql(
+        s"""
+           |CREATE TABLE jsonTable
+           |USING json
+           |OPTIONS (
+           |  path '${path.toString}'
+           |) AS
+           |SELECT a, b FROM jt
+         """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT a, b FROM jsonTable"),
+        sql("SELECT a, b FROM jt"))
+    }
   }
 
-  test("CREATE TEMPORARY TABLE AS SELECT based on the file without write permission") {
+  test("CREATE TABLE USING AS SELECT based on the file without write permission") {
     val childPath = new File(path.toString, "child")
     path.mkdir()
-    childPath.createNewFile()
     path.setWritable(false)
 
-    val e = intercept[IOException] {
+    val e = intercept[SparkException] {
       sql(
         s"""
-           |CREATE TEMPORARY TABLE jsonTable
+           |CREATE TABLE jsonTable
            |USING json
            |OPTIONS (
-           |  path '${path.toString}'
+           |  path '${childPath.toString}'
            |) AS
            |SELECT a, b FROM jt
-        """.stripMargin)
+         """.stripMargin)
       sql("SELECT a, b FROM jsonTable").collect()
     }
-    assert(e.getMessage().contains("Unable to clear output directory"))
 
+    assert(e.getMessage().contains("Job aborted"))
     path.setWritable(true)
   }
 
   test("create a table, drop it and create another one with the same name") {
-    sql(
-      s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a, b FROM jt
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT a, b FROM jsonTable"),
-      sql("SELECT a, b FROM jt").collect())
-
-    val message = intercept[ParseException]{
+    withTable("jsonTable") {
       sql(
         s"""
-        |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a * 4 FROM jt
-      """.stripMargin)
-    }.getMessage
-    assert(message.toLowerCase.contains("operation not allowed"))
-
-    // Overwrite the temporary table.
-    sql(
-      s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a * 4 FROM jt
-      """.stripMargin)
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      sql("SELECT a * 4 FROM jt").collect())
-
-    spark.catalog.dropTempView("jsonTable")
-    // Explicitly delete the data.
-    if (path.exists()) Utils.deleteRecursively(path)
-
-    sql(
-      s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT b FROM jt
-      """.stripMargin)
-
-    checkAnswer(
-      sql("SELECT * FROM jsonTable"),
-      sql("SELECT b FROM jt").collect())
-
-    spark.catalog.dropTempView("jsonTable")
-  }
+           |CREATE TABLE jsonTable
+           |USING json
+           |OPTIONS (
+           |  path '${path.toString}'
+           |) AS
+           |SELECT a, b FROM jt
+         """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT a, b FROM jsonTable"),
+        sql("SELECT a, b FROM jt"))
+
+      // Creates a table of the same name with flag "if not exists", nothing happens
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS jsonTable
+           |USING json
+           |OPTIONS (
+           |  path '${path.toString}'
+           |) AS
+           |SELECT a * 4 FROM jt
+         """.stripMargin)
+      checkAnswer(
+        sql("SELECT * FROM jsonTable"),
+        sql("SELECT a, b FROM jt"))
+
+      // Explicitly drops the table and deletes the underlying data.
+      sql("DROP TABLE jsonTable")
+      if (path.exists()) Utils.deleteRecursively(path)
 
-  test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") {
-    val message = intercept[ParseException]{
+      // Creates a table of the same name again, this time we succeed.
       sql(
         s"""
-        |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT b FROM jt
-      """.stripMargin)
-    }.getMessage
-    assert(message.toLowerCase.contains("operation not allowed"))
+           |CREATE TABLE jsonTable
+           |USING json
+           |OPTIONS (
+           |  path '${path.toString}'
+           |) AS
+           |SELECT b FROM jt
+         """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT * FROM jsonTable"),
+        sql("SELECT b FROM jt"))
+    }
+  }
+
+  test("disallows CREATE TEMPORARY TABLE ... USING ... AS query") {
+    withTable("t") {
+      val error = intercept[ParseException] {
+        sql(
+          s"""
+             |CREATE TEMPORARY TABLE t USING PARQUET
+             |OPTIONS (PATH '${path.toString}')
+             |PARTITIONED BY (a)
+             |AS SELECT 1 AS a, 2 AS b
+           """.stripMargin
+        )
+      }.getMessage
+      assert(error.contains("Operation not allowed") &&
+        error.contains("CREATE TEMPORARY TABLE ... USING ... AS query"))
+    }
   }
 
-  test("a CTAS statement with column definitions is not allowed") {
-    intercept[AnalysisException]{
+  test("disallows CREATE EXTERNAL TABLE ... USING ... AS query") {
+    withTable("t") {
+      val error = intercept[ParseException] {
+        sql(
+          s"""
+             |CREATE EXTERNAL TABLE t USING PARQUET
+             |OPTIONS (PATH '${path.toString}')
+             |AS SELECT 1 AS a, 2 AS b
+           """.stripMargin
+        )
+      }.getMessage
+
+      assert(error.contains("Operation not allowed") &&
+        error.contains("CREATE EXTERNAL TABLE ... USING"))
+    }
+  }
+
+  test("create table using as select - with partitioned by") {
+    val catalog = spark.sessionState.catalog
+    withTable("t") {
       sql(
         s"""
-        |CREATE TEMPORARY TABLE jsonTable (a int, b string)
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a, b FROM jt
-      """.stripMargin)
+           |CREATE TABLE t USING PARQUET
+           |OPTIONS (PATH '${path.toString}')
+           |PARTITIONED BY (a)
+           |AS SELECT 1 AS a, 2 AS b
+         """.stripMargin
+      )
+      val table = catalog.getTableMetadata(TableIdentifier("t"))
+      assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == Seq("a"))
     }
   }
 
-  test("it is not allowed to write to a table while querying it.") {
-    sql(
-      s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a, b FROM jt
-      """.stripMargin)
-
-    val message = intercept[AnalysisException] {
+  test("create table using as select - with bucket") {
+    val catalog = spark.sessionState.catalog
+    withTable("t") {
       sql(
         s"""
-        |CREATE TEMPORARY TABLE jsonTable
-        |USING json
-        |OPTIONS (
-        |  path '${path.toString}'
-        |) AS
-        |SELECT a, b FROM jsonTable
-      """.stripMargin)
-    }.getMessage
-    assert(
-      message.contains("Cannot overwrite table "),
-      "Writing to a table while querying it should not be allowed.")
+           |CREATE TABLE t USING PARQUET
+           |OPTIONS (PATH '${path.toString}')
+           |CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS
+           |AS SELECT 1 AS a, 2 AS b
+         """.stripMargin
+      )
+      val table = catalog.getTableMetadata(TableIdentifier("t"))
+      assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
+        Some(BucketSpec(5, Seq("a"), Seq("b"))))
+    }
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 24de223cf899b8dad4703ffb600fd719eb11078b..499819f32b43409de5845270a5ca39366e5af100 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1506,52 +1506,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
-  test(
-    "SPARK-14488 \"CREATE TEMPORARY TABLE ... USING ... AS SELECT ...\" " +
-    "shouldn't create persisted table"
-  ) {
-    withTempPath { dir =>
-      withTempTable("t1", "t2") {
-        val path = dir.getCanonicalPath
-        val ds = spark.range(10)
-        ds.createOrReplaceTempView("t1")
-
-        sql(
-          s"""CREATE TEMPORARY TABLE t2
-             |USING PARQUET
-             |OPTIONS (PATH '$path')
-             |AS SELECT * FROM t1
-           """.stripMargin)
-
-        checkAnswer(
-          spark.sql("SHOW TABLES").select('isTemporary).filter('tableName === "t2"),
-          Row(true)
-        )
-
-        checkAnswer(table("t2"), table("t1"))
-      }
-    }
-  }
-
-  test(
-    "SPARK-14493 \"CREATE TEMPORARY TABLE ... USING ... AS SELECT ...\" " +
-    "shouldn always be used together with PATH data source option"
-  ) {
-    withTempTable("t") {
-      spark.range(10).createOrReplaceTempView("t")
-
-      val message = intercept[IllegalArgumentException] {
-        sql(
-          s"""CREATE TEMPORARY TABLE t1
-             |USING PARQUET
-             |AS SELECT * FROM t
-           """.stripMargin)
-      }.getMessage
-
-      assert(message == "'path' is not specified")
-    }
-  }
-
   test("derived from Hive query file: drop_database_removes_partition_dirs.q") {
     // This test verifies that if a partition exists outside a table's current location when the
     // database is dropped the partition's location is dropped as well.