From ca83f1e2c4dfa519e44b837b6815cba3b4526d92 Mon Sep 17 00:00:00 2001
From: Cheng Hao <hao.cheng@intel.com>
Date: Thu, 11 Sep 2014 11:57:01 -0700
Subject: [PATCH] [SPARK-2917] [SQL] Avoid table creation in logical plan
 analyzing for CTAS

Author: Cheng Hao <hao.cheng@intel.com>

Closes #1846 from chenghao-intel/ctas and squashes the following commits:

56a0578 [Cheng Hao] remove the unused imports
9a57abc [Cheng Hao] Avoid table creation in logical plan analyzing
---
 .../plans/logical/basicOperators.scala        |  3 +-
 .../org/apache/spark/sql/SchemaRDDLike.scala  |  4 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala | 14 +---
 .../org/apache/spark/sql/hive/HiveQl.scala    |  2 +-
 .../spark/sql/hive/HiveStrategies.scala       | 10 +++
 .../hive/execution/CreateTableAsSelect.scala  | 73 +++++++++++++++++++
 .../hive/execution/InsertIntoHiveTable.scala  |  6 +-
 .../sql/hive/execution/SQLQuerySuite.scala    |  9 +++
 8 files changed, 104 insertions(+), 17 deletions(-)
 create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 4adfb18937..5d10754c7b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -114,11 +114,12 @@ case class InsertIntoTable(
   }
 }
 
-case class InsertIntoCreatedTable(
+case class CreateTableAsSelect(
     databaseName: Option[String],
     tableName: String,
     child: LogicalPlan) extends UnaryNode {
   override def output = child.output
+  override lazy val resolved = (databaseName != None && childrenResolved)
 }
 
 case class WriteToFile(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 2f3033a5f9..e52eeb3e1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
   @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
     // For various commands (like DDL) and queries with side effects, we force query optimization to
     // happen right away to let these side effects take place eagerly.
-    case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
+    case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
       queryExecution.toRdd
       SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
     case _ =>
@@ -124,7 +124,7 @@ private[sql] trait SchemaRDDLike {
    */
   @Experimental
   def saveAsTable(tableName: String): Unit =
-    sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
+    sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan)).toRdd
 
   /** Returns the schema as a string in the tree format.
    *
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index dfa2a7a9d2..2c0db9be57 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -54,8 +54,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
       db: Option[String],
       tableName: String,
       alias: Option[String]): LogicalPlan = synchronized {
-    val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
-    val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
+    val (databaseName, tblName) = processDatabaseAndTableName(
+                                    db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
     val table = client.getTable(databaseName, tblName)
     val partitions: Seq[Partition] =
       if (table.isPartitioned) {
@@ -112,17 +112,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
 
-      case InsertIntoCreatedTable(db, tableName, child) =>
+      case CreateTableAsSelect(db, tableName, child) =>
         val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
         val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
 
-        createTable(databaseName, tblName, child.output)
-
-        InsertIntoTable(
-          lookupRelation(Some(databaseName), tblName, None),
-          Map.empty,
-          child,
-          overwrite = false)
+        CreateTableAsSelect(Some(databaseName), tableName, child)
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index c98287c6aa..21ecf17028 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -489,7 +489,7 @@ private[hive] object HiveQl {
 
       val (db, tableName) = extractDbNameTableName(tableNameParts)
 
-      InsertIntoCreatedTable(db, tableName, nodeToPlan(query))
+      CreateTableAsSelect(db, tableName, nodeToPlan(query))
 
     // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
     case Token("TOK_CREATETABLE", _) => NativePlaceholder
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 72cc01cdf4..43dd3d234f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -165,6 +165,16 @@ private[hive] trait HiveStrategies {
              InMemoryRelation(_, _, _,
                HiveTableScan(_, table, _)), partition, child, overwrite) =>
         InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
+      case logical.CreateTableAsSelect(database, tableName, child) =>
+        val query = planLater(child)
+        CreateTableAsSelect(
+          database.get,
+          tableName,
+          query,
+          InsertIntoHiveTable(_: MetastoreRelation, 
+            Map(), 
+            query, 
+            true)(hiveContext)) :: Nil
       case _ => Nil
     }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
new file mode 100644
index 0000000000..71ea774d77
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LowerCaseSchema
+import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode}
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.MetastoreRelation
+
+/**
+ * :: Experimental ::
+ * Create table and insert the query result into it.
+ * @param database the database name of the new relation
+ * @param tableName the table name of the new relation
+ * @param insertIntoRelation function of creating the `InsertIntoHiveTable` 
+ *        by specifying the `MetaStoreRelation`, the data will be inserted into that table.
+ * TODO Add more table creating properties,  e.g. SerDe, StorageHandler, in-memory cache etc.
+ */
+@Experimental
+case class CreateTableAsSelect(
+  database: String,
+  tableName: String,
+  query: SparkPlan,
+  insertIntoRelation: MetastoreRelation => InsertIntoHiveTable)
+    extends LeafNode with Command {
+
+  def output = Seq.empty
+
+  // A lazy computing of the metastoreRelation
+  private[this] lazy val metastoreRelation: MetastoreRelation = {
+    // Create the table 
+    val sc = sqlContext.asInstanceOf[HiveContext]
+    sc.catalog.createTable(database, tableName, query.output, false)
+    // Get the Metastore Relation
+    sc.catalog.lookupRelation(Some(database), tableName, None) match {
+      case LowerCaseSchema(r: MetastoreRelation) => r
+      case o: MetastoreRelation => o
+    }
+  }
+
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+    insertIntoRelation(metastoreRelation).execute
+    Seq.empty[Row]
+  }
+
+  override def execute(): RDD[Row] = {
+    sideEffectResult
+    sparkContext.emptyRDD[Row]
+  }
+
+  override def argString: String = {
+    s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString
+  }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 39033bdeac..a284a91a91 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -53,9 +53,9 @@ case class InsertIntoHiveTable(
     (@transient sc: HiveContext)
   extends UnaryNode {
 
-  val outputClass = newSerializer(table.tableDesc).getSerializedClass
-  @transient private val hiveContext = new Context(sc.hiveconf)
-  @transient private val db = Hive.get(sc.hiveconf)
+  @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
+  @transient private lazy val hiveContext = new Context(sc.hiveconf)
+  @transient private lazy val db = Hive.get(sc.hiveconf)
 
   private def newSerializer(tableDesc: TableDesc): Serializer = {
     val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b99caf77bc..679efe082f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.QueryTest
+
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.test.TestHive._
 
 case class Nested1(f1: Nested2)
@@ -54,4 +56,11 @@ class SQLQuerySuite extends QueryTest {
       sql("SELECT f1.f2.f3 FROM nested"),
       1)
   }
+
+  test("test CTAS") {
+    checkAnswer(sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src"), Seq.empty[Row])
+    checkAnswer(
+      sql("SELECT key, value FROM test_ctas_123 ORDER BY key"), 
+      sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
+  }
 }
-- 
GitLab