diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 7165db1d5d2adc989d2cdd5e5e687531e3bf2577..569b99e414c374f1fd16de05745efdd63cae52d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import java.io.File
+
 import scala.collection.mutable
 
 import org.apache.spark.sql.AnalysisException
@@ -114,6 +116,10 @@ class SessionCatalog(
     currentDb = db
   }
 
+  def getDefaultDBPath(db: String): String = {
+    System.getProperty("java.io.tmpdir") + File.separator + db + ".db"
+  }
+
   // ----------------------------------------------------------------------------
   // Tables
   // ----------------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 9f270236aeab3e5527dd7cf692f1600d9f45f157..303846d31332f3784427d30c8bb4fac50ec09a73 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -39,7 +39,7 @@ abstract class ExternalCatalog {
 
   protected def requireDbExists(db: String): Unit = {
     if (!databaseExists(db)) {
-      throw new AnalysisException(s"Database $db does not exist")
+      throw new AnalysisException(s"Database '$db' does not exist")
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
index d4d1992d2701c7a1d21b0c197ddc28472c1beb85..6fe04757ba2d172d35e687514d273efbdae747e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
@@ -97,7 +97,8 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
 
       // CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment]
       // [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)];
-      case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) =>
+      case Token("TOK_CREATEDATABASE", Token(dbName, Nil) :: args) =>
+        val databaseName = cleanIdentifier(dbName)
         val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq(
           "TOK_IFNOTEXISTS",
           "TOK_DATABASELOCATION",
@@ -126,7 +127,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
             extractProps(propList, "TOK_TABLEPROPERTY")
           case _ => parseFailed("Invalid CREATE DATABASE command", node)
         }.toMap
-        CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source)
+        CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)
 
       // DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
       case Token("TOK_DROPDATABASE", Token(dbName, Nil) :: otherArgs) =>
@@ -136,15 +137,15 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
         //   :- database_name
         //   :- TOK_IFEXISTS
         //   +- TOK_RESTRICT/TOK_CASCADE
-        val databaseName = unquoteString(dbName)
+        val databaseName = cleanIdentifier(dbName)
         // The default is RESTRICT
         val Seq(ifExists, _, cascade) = getClauses(Seq(
           "TOK_IFEXISTS", "TOK_RESTRICT", "TOK_CASCADE"), otherArgs)
-        DropDatabase(databaseName, ifExists.isDefined, restrict = cascade.isEmpty)(node.source)
+        DropDatabase(databaseName, ifExists.isDefined, cascade.isDefined)
 
       // ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
       case Token("TOK_ALTERDATABASE_PROPERTIES", Token(dbName, Nil) :: args) =>
-        val databaseName = unquoteString(dbName)
+        val databaseName = cleanIdentifier(dbName)
         val dbprops = getClause("TOK_DATABASEPROPERTIES", args)
         val props = dbprops match {
           case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) =>
@@ -161,13 +162,13 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
             extractProps(propList, "TOK_TABLEPROPERTY")
           case _ => parseFailed("Invalid ALTER DATABASE command", node)
         }
-        AlterDatabaseProperties(databaseName, props.toMap)(node.source)
+        AlterDatabaseProperties(databaseName, props.toMap)
 
       // DESCRIBE DATABASE [EXTENDED] db_name
       case Token("TOK_DESCDATABASE", Token(dbName, Nil) :: describeArgs) =>
-        val databaseName = unquoteString(dbName)
+        val databaseName = cleanIdentifier(dbName)
         val extended = getClauseOption("EXTENDED", describeArgs)
-        DescribeDatabase(databaseName, extended.isDefined)(node.source)
+        DescribeDatabase(databaseName, extended.isDefined)
 
       // CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
       // [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index a8313deeefc2f8a687637cee9898e95241bd83dd..8333074ecaf22d127e6d9693a7d6450f94b4e9a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -232,8 +232,7 @@ class SparkSqlAstBuilder extends AstBuilder {
       ctx.EXISTS != null,
       Option(ctx.locationSpec).map(visitLocationSpec),
       Option(ctx.comment).map(string),
-      Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))(
-      command(ctx))
+      Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))
   }
 
   /**
@@ -248,8 +247,7 @@ class SparkSqlAstBuilder extends AstBuilder {
       ctx: SetDatabasePropertiesContext): LogicalPlan = withOrigin(ctx) {
     AlterDatabaseProperties(
       ctx.identifier.getText,
-      visitTablePropertyList(ctx.tablePropertyList))(
-      command(ctx))
+      visitTablePropertyList(ctx.tablePropertyList))
   }
 
   /**
@@ -261,7 +259,7 @@ class SparkSqlAstBuilder extends AstBuilder {
    * }}}
    */
   override def visitDropDatabase(ctx: DropDatabaseContext): LogicalPlan = withOrigin(ctx) {
-    DropDatabase(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE == null)(command(ctx))
+    DropDatabase(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE != null)
   }
 
   /**
@@ -273,7 +271,7 @@ class SparkSqlAstBuilder extends AstBuilder {
    * }}}
    */
   override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) {
-    DescribeDatabase(ctx.identifier.getText, ctx.EXTENDED != null)(command(ctx))
+    DescribeDatabase(ctx.identifier.getText, ctx.EXTENDED != null)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 0e51abb44b91df7b718ebfe71a7ee7cb0bdb1743..6c2a67f81c50d69e31f42c53d2100bf800238539 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.datasources.BucketSpec
@@ -45,46 +45,135 @@ abstract class NativeDDLCommand(val sql: String) extends RunnableCommand {
 
 }
 
+/**
+ * A command for users to create a new database.
+ *
+ * It will issue an error message when the database with the same name already exists,
+ * unless 'ifNotExists' is true.
+ * The syntax of using this command in SQL is:
+ * {{{
+ *    CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name
+ * }}}
+ */
 case class CreateDatabase(
     databaseName: String,
     ifNotExists: Boolean,
     path: Option[String],
     comment: Option[String],
-    props: Map[String, String])(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    props: Map[String, String])
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    catalog.createDatabase(
+      CatalogDatabase(
+        databaseName,
+        comment.getOrElse(""),
+        path.getOrElse(catalog.getDefaultDBPath(databaseName)),
+        props),
+      ifNotExists)
+    Seq.empty[Row]
+  }
+
+  override val output: Seq[Attribute] = Seq.empty
+}
+
 
 /**
- * Drop Database: Removes a database from the system.
+ * A command for users to remove a database from the system.
  *
  * 'ifExists':
  * - true, if database_name does't exist, no action
  * - false (default), if database_name does't exist, a warning message will be issued
- * 'restric':
- * - true (default), the database cannot be dropped if it is not empty. The inclusive
- * tables must be dropped at first.
- * - false, it is in the Cascade mode. The dependent objects are automatically dropped
- * before dropping database.
+ * 'cascade':
+ * - true, the dependent objects are automatically dropped before dropping database.
+ * - false (default), it is in the Restrict mode. The database cannot be dropped if
+ * it is not empty. The inclusive tables must be dropped at first.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ *    DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
+ * }}}
  */
 case class DropDatabase(
     databaseName: String,
     ifExists: Boolean,
-    restrict: Boolean)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    cascade: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    sqlContext.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
+    Seq.empty[Row]
+  }
+
+  override val output: Seq[Attribute] = Seq.empty
+}
 
-/** ALTER DATABASE: add new (key, value) pairs into DBPROPERTIES */
+/**
+ * A command for users to add new (key, value) pairs into DBPROPERTIES
+ * If the database does not exist, an error message will be issued to indicate the database
+ * does not exist.
+ * The syntax of using this command in SQL is:
+ * {{{
+ *    ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
+ * }}}
+ */
 case class AlterDatabaseProperties(
     databaseName: String,
-    props: Map[String, String])(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    props: Map[String, String])
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val db: CatalogDatabase = catalog.getDatabase(databaseName)
+    catalog.alterDatabase(db.copy(properties = db.properties ++ props))
+
+    Seq.empty[Row]
+  }
+
+  override val output: Seq[Attribute] = Seq.empty
+}
 
 /**
- * DESCRIBE DATABASE: shows the name of the database, its comment (if one has been set), and its
+ * A command for users to show the name of the database, its comment (if one has been set), and its
  * root location on the filesystem. When extended is true, it also shows the database's properties
+ * If the database does not exist, an error message will be issued to indicate the database
+ * does not exist.
+ * The syntax of using this command in SQL is
+ * {{{
+ *    DESCRIBE DATABASE [EXTENDED] db_name
+ * }}}
  */
 case class DescribeDatabase(
     databaseName: String,
-    extended: Boolean)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    extended: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName)
+    val result =
+      Row("Database Name", dbMetadata.name) ::
+        Row("Description", dbMetadata.description) ::
+        Row("Location", dbMetadata.locationUri) :: Nil
+
+    if (extended) {
+      val properties =
+        if (dbMetadata.properties.isEmpty) {
+          ""
+        } else {
+          dbMetadata.properties.toSeq.mkString("(", ", ", ")")
+        }
+      result :+ Row("Properties", properties)
+    } else {
+      result
+    }
+  }
+
+  override val output: Seq[Attribute] = {
+    AttributeReference("database_description_item", StringType, nullable = false)() ::
+      AttributeReference("database_description_value", StringType, nullable = false)() :: Nil
+  }
+}
 
 case class CreateFunction(
     databaseName: Option[String],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 03079c6890a8416d6c8e1eff4cd0b999ab9a444e..ccbfd41cca22efd68c279e48400bd103d87f991b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -39,7 +39,7 @@ class DDLCommandSuite extends PlanTest {
       ifNotExists = true,
       Some("/home/user/db"),
       Some("database_comment"),
-      Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql)
+      Map("a" -> "a", "b" -> "b", "c" -> "c"))
     comparePlans(parsed, expected)
   }
 
@@ -65,39 +65,27 @@ class DDLCommandSuite extends PlanTest {
     val expected1 = DropDatabase(
       "database_name",
       ifExists = true,
-      restrict = true)(sql1)
+      cascade = false)
     val expected2 = DropDatabase(
       "database_name",
       ifExists = true,
-      restrict = false)(sql2)
+      cascade = true)
     val expected3 = DropDatabase(
-      "database_name",
-      ifExists = true,
-      restrict = true)(sql3)
-    val expected4 = DropDatabase(
-      "database_name",
-      ifExists = true,
-      restrict = false)(sql4)
-    val expected5 = DropDatabase(
-      "database_name",
-      ifExists = true,
-      restrict = true)(sql5)
-    val expected6 = DropDatabase(
       "database_name",
       ifExists = false,
-      restrict = true)(sql6)
-    val expected7 = DropDatabase(
+      cascade = false)
+    val expected4 = DropDatabase(
       "database_name",
       ifExists = false,
-      restrict = false)(sql7)
+      cascade = true)
 
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
-    comparePlans(parsed3, expected3)
-    comparePlans(parsed4, expected4)
-    comparePlans(parsed5, expected5)
-    comparePlans(parsed6, expected6)
-    comparePlans(parsed7, expected7)
+    comparePlans(parsed3, expected1)
+    comparePlans(parsed4, expected2)
+    comparePlans(parsed5, expected1)
+    comparePlans(parsed6, expected3)
+    comparePlans(parsed7, expected4)
   }
 
   test("alter database set dbproperties") {
@@ -110,10 +98,10 @@ class DDLCommandSuite extends PlanTest {
 
     val expected1 = AlterDatabaseProperties(
       "database_name",
-      Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql1)
+      Map("a" -> "a", "b" -> "b", "c" -> "c"))
     val expected2 = AlterDatabaseProperties(
       "database_name",
-      Map("a" -> "a"))(sql2)
+      Map("a" -> "a"))
 
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
@@ -129,10 +117,10 @@ class DDLCommandSuite extends PlanTest {
 
     val expected1 = DescribeDatabase(
       "db_name",
-      extended = true)(sql1)
+      extended = true)
     val expected2 = DescribeDatabase(
       "db_name",
-      extended = false)(sql2)
+      extended = false)
 
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..47c9a22acd44886d156fd3b9cf9672ace78b0d02
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
+import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.test.SharedSQLContext
+
+class DDLSuite extends QueryTest with SharedSQLContext {
+
+  /**
+   * Drops database `databaseName` after calling `f`.
+   */
+  private def withDatabase(dbNames: String*)(f: => Unit): Unit = {
+    try f finally {
+      dbNames.foreach { name =>
+        sqlContext.sql(s"DROP DATABASE IF EXISTS $name CASCADE")
+      }
+    }
+  }
+
+  test("Create/Drop Database") {
+    val catalog = sqlContext.sessionState.catalog
+
+    val databaseNames = Seq("db1", "`database`")
+
+    databaseNames.foreach { dbName =>
+      withDatabase(dbName) {
+        val dbNameWithoutBackTicks = cleanIdentifier(dbName)
+
+        sql(s"CREATE DATABASE $dbName")
+        val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
+        assert(db1 == CatalogDatabase(
+          dbNameWithoutBackTicks,
+          "",
+          System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
+          Map.empty))
+        sql(s"DROP DATABASE $dbName CASCADE")
+        assert(!catalog.databaseExists(dbNameWithoutBackTicks))
+      }
+    }
+  }
+
+  test("Create Database - database already exists") {
+    val catalog = sqlContext.sessionState.catalog
+    val databaseNames = Seq("db1", "`database`")
+
+    databaseNames.foreach { dbName =>
+      val dbNameWithoutBackTicks = cleanIdentifier(dbName)
+      withDatabase(dbName) {
+        sql(s"CREATE DATABASE $dbName")
+        val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
+        assert(db1 == CatalogDatabase(
+          dbNameWithoutBackTicks,
+          "",
+          System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
+          Map.empty))
+
+        val message = intercept[AnalysisException] {
+          sql(s"CREATE DATABASE $dbName")
+        }.getMessage
+        assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
+      }
+    }
+  }
+
+  test("Alter/Describe Database") {
+    val catalog = sqlContext.sessionState.catalog
+    val databaseNames = Seq("db1", "`database`")
+
+    databaseNames.foreach { dbName =>
+      withDatabase(dbName) {
+        val dbNameWithoutBackTicks = cleanIdentifier(dbName)
+        val location =
+          System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
+        sql(s"CREATE DATABASE $dbName")
+
+        checkAnswer(
+          sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+          Row("Database Name", dbNameWithoutBackTicks) ::
+            Row("Description", "") ::
+            Row("Location", location) ::
+            Row("Properties", "") :: Nil)
+
+        sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
+
+        checkAnswer(
+          sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+          Row("Database Name", dbNameWithoutBackTicks) ::
+            Row("Description", "") ::
+            Row("Location", location) ::
+            Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
+
+        sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
+
+        checkAnswer(
+          sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+          Row("Database Name", dbNameWithoutBackTicks) ::
+            Row("Description", "") ::
+            Row("Location", location) ::
+            Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
+      }
+    }
+  }
+
+  test("Drop/Alter/Describe Database - database does not exists") {
+    val databaseNames = Seq("db1", "`database`")
+
+    databaseNames.foreach { dbName =>
+      val dbNameWithoutBackTicks = cleanIdentifier(dbName)
+      assert(!sqlContext.sessionState.catalog.databaseExists(dbNameWithoutBackTicks))
+
+      var message = intercept[AnalysisException] {
+        sql(s"DROP DATABASE $dbName")
+      }.getMessage
+      assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist"))
+
+      message = intercept[AnalysisException] {
+        sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
+      }.getMessage
+      assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist"))
+
+      message = intercept[AnalysisException] {
+        sql(s"DESCRIBE DATABASE EXTENDED $dbName")
+      }.getMessage
+      assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist"))
+
+      sql(s"DROP DATABASE IF EXISTS $dbName")
+    }
+  }
+
+  // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext
+}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 8e1ebe2937d232a1bc3023aa38ed7b087c2bfb94..7ad7f92bd25f761e56f8649b14beef7bec368203 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -183,7 +183,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
   test("Single command with --database") {
     runCliWithin(2.minute)(
       "CREATE DATABASE hive_test_db;"
-        -> "OK",
+        -> "",
       "USE hive_test_db;"
         -> "",
       "CREATE TABLE hive_test(key INT, val STRING);"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index ff12245e8ddd23e2d760c34bfa9063c6401a2a13..1cd783e63a25274aaae5ec6ae19459158a6a4661 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -59,6 +62,11 @@ class HiveSessionCatalog(
   // | Methods and fields for interacting with HiveMetastoreCatalog |
   // ----------------------------------------------------------------
 
+  override def getDefaultDBPath(db: String): String = {
+    val defaultPath = context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)
+    new Path(new Path(defaultPath), db + ".db").toString
+  }
+
   // Catalog for handling data source tables. TODO: This really doesn't belong here since it is
   // essentially a cache for metastore tables. However, it relies on a lot of session-specific
   // things so it would be a lot of work to split its functionality between HiveSessionCatalog