Skip to content
Snippets Groups Projects
Commit 8f0a3d5b authored by gatorsmile's avatar gatorsmile Committed by Reynold Xin
Browse files

[SPARK-15330][SQL] Implement Reset Command

#### What changes were proposed in this pull request?
Like `Set` Command in Hive, `Reset` is also supported by Hive. See the link: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli

Below is the related Hive JIRA: https://issues.apache.org/jira/browse/HIVE-3202

This PR is to implement such a command for resetting the SQL-related configuration to the default values. One of the use case shown in HIVE-3202 is listed below:

> For the purpose of optimization we set various configs per query. It's worthy but all those configs should be reset every time for next query.

#### How was this patch tested?
Added a test case.

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #13121 from gatorsmile/resetCommand.
parent c18fa464
No related branches found
No related tags found
No related merge requests found
...@@ -120,6 +120,7 @@ statement ...@@ -120,6 +120,7 @@ statement
| ADD identifier .*? #addResource | ADD identifier .*? #addResource
| SET ROLE .*? #failNativeCommand | SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration | SET .*? #setConfiguration
| RESET #resetConfiguration
| unsupportedHiveNativeCommands .*? #failNativeCommand | unsupportedHiveNativeCommands .*? #failNativeCommand
; ;
...@@ -633,7 +634,7 @@ nonReserved ...@@ -633,7 +634,7 @@ nonReserved
| GROUPING | CUBE | ROLLUP | GROUPING | CUBE | ROLLUP
| EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN | EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN
| TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF | TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF
| SET | SET | RESET
| VIEW | REPLACE | VIEW | REPLACE
| IF | IF
| NO | DATA | NO | DATA
...@@ -748,6 +749,7 @@ MAP: 'MAP'; ...@@ -748,6 +749,7 @@ MAP: 'MAP';
STRUCT: 'STRUCT'; STRUCT: 'STRUCT';
COMMENT: 'COMMENT'; COMMENT: 'COMMENT';
SET: 'SET'; SET: 'SET';
RESET: 'RESET';
DATA: 'DATA'; DATA: 'DATA';
START: 'START'; START: 'START';
TRANSACTION: 'TRANSACTION'; TRANSACTION: 'TRANSACTION';
......
...@@ -75,6 +75,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ...@@ -75,6 +75,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
} }
} }
/**
* Create a [[ResetCommand]] logical plan.
* Example SQL :
* {{{
* RESET;
* }}}
*/
override def visitResetConfiguration(
ctx: ResetConfigurationContext): LogicalPlan = withOrigin(ctx) {
ResetCommand
}
/** /**
* Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN * Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN
* option (other options are passed on to Hive) e.g.: * option (other options are passed on to Hive) e.g.:
......
...@@ -116,3 +116,19 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm ...@@ -116,3 +116,19 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
override def run(sparkSession: SparkSession): Seq[Row] = runFunc(sparkSession) override def run(sparkSession: SparkSession): Seq[Row] = runFunc(sparkSession)
} }
/**
* This command is for resetting SQLConf to the default values. Command that runs
* {{{
* reset;
* }}}
*/
case object ResetCommand extends RunnableCommand with Logging {
override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.conf.clear()
Seq.empty[Row]
}
override val output: Seq[Attribute] = Seq.empty
}
...@@ -99,7 +99,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { ...@@ -99,7 +99,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
test("deprecated property") { test("deprecated property") {
spark.sqlContext.conf.clear() spark.sqlContext.conf.clear()
val original = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) val original = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
try{ try {
sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10") sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
assert(spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) === 10) assert(spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) === 10)
} finally { } finally {
...@@ -107,6 +107,53 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { ...@@ -107,6 +107,53 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
} }
} }
test("reset - public conf") {
spark.sqlContext.conf.clear()
val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL)
try {
assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === true)
sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=false")
assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === false)
assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 1)
sql(s"reset")
assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === true)
assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 0)
} finally {
sql(s"set ${SQLConf.GROUP_BY_ORDINAL}=$original")
}
}
test("reset - internal conf") {
spark.sqlContext.conf.clear()
val original = spark.conf.get(SQLConf.NATIVE_VIEW)
try {
assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
sql(s"set ${SQLConf.NATIVE_VIEW.key}=false")
assert(spark.conf.get(SQLConf.NATIVE_VIEW) === false)
assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() == 1)
sql(s"reset")
assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true)
assert(sql(s"set").where(s"key = '${SQLConf.NATIVE_VIEW.key}'").count() == 0)
} finally {
sql(s"set ${SQLConf.NATIVE_VIEW}=$original")
}
}
test("reset - user-defined conf") {
spark.sqlContext.conf.clear()
val userDefinedConf = "x.y.z.reset"
try {
assert(spark.conf.getOption(userDefinedConf).isEmpty)
sql(s"set $userDefinedConf=false")
assert(spark.conf.get(userDefinedConf) === "false")
assert(sql(s"set").where(s"key = '$userDefinedConf'").count() == 1)
sql(s"reset")
assert(spark.conf.getOption(userDefinedConf).isEmpty)
} finally {
spark.conf.unset(userDefinedConf)
}
}
test("invalid conf value") { test("invalid conf value") {
spark.sqlContext.conf.clear() spark.sqlContext.conf.clear()
val e = intercept[IllegalArgumentException] { val e = intercept[IllegalArgumentException] {
......
...@@ -32,8 +32,8 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils} ...@@ -32,8 +32,8 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor, import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor}
CommandProcessorFactory, SetProcessor} import org.apache.hadoop.hive.ql.processors.{CommandProcessorFactory, ResetProcessor, SetProcessor}
import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.thrift.transport.TSocket import org.apache.thrift.transport.TSocket
...@@ -312,7 +312,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { ...@@ -312,7 +312,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
if (proc != null) { if (proc != null) {
// scalastyle:off println // scalastyle:off println
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] || if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] ||
proc.isInstanceOf[AddResourceProcessor]) { proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ResetProcessor]) {
val driver = new SparkSQLDriver val driver = new SparkSQLDriver
driver.init() driver.init()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment