diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3ea808926e10b54a746567e9c4d559b8cd914fea..9f7c760fb9d21a6bad1e9c046ce95a1c1df4fee1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -552,12 +552,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse") - .internal() - .doc("When true, the planner will try to find out duplicated subqueries and re-use them.") - .booleanConf - .createWithDefault(true) - val STATE_STORE_PROVIDER_CLASS = buildConf("spark.sql.streaming.stateStore.providerClass") .internal() @@ -938,8 +932,6 @@ class SQLConf extends Serializable with Logging { def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED) - def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED) - def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 04c130314388ac47e29dbe08f595fe89467cb97b..bd7a5c5d914c17155f933732623f98c4a4b31b23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -599,9 +599,6 @@ case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends Spa */ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { - // Ignore this wrapper for canonicalizing. - override lazy val canonicalized: SparkPlan = child.canonicalized - override lazy val metrics = Map( "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"), "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 2abeadfe4536299ba3c183e90734b109e4a87f41..d11045fb6ac8c5c72596d32cd5059e5de3473b8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] { case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { - if (!conf.subqueryReuseEnabled) { + if (!conf.exchangeReuseEnabled) { return plan } // Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index a7efcafa0166ac2930ce267c4cbc0446ce68dece..68f61cfab6d2faf605ccf020b15b03536277eea5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -23,12 +23,9 @@ import java.net.{MalformedURLException, URL} import java.sql.Timestamp import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils -import org.apache.spark.sql.execution.{ScalarSubquery, SubqueryExec} import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ @@ -703,38 +700,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { row => Seq.fill(16)(Row.merge(row, row))).collect().toSeq) } - test("Verify spark.sql.subquery.reuse") { - Seq(true, false).foreach { reuse => - withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) { - val df = sql( - """ - |SELECT key, (SELECT avg(key) FROM testData) - |FROM testData - |WHERE key > (SELECT avg(key) FROM testData) - |ORDER BY key - |LIMIT 3 - """.stripMargin) - - checkAnswer(df, Row(51, 50.5) :: Row(52, 50.5) :: Row(53, 50.5) :: Nil) - - val subqueries = ArrayBuffer.empty[SubqueryExec] - df.queryExecution.executedPlan.transformAllExpressions { - case s @ ScalarSubquery(plan: SubqueryExec, _) => - subqueries += plan - s - } - - assert(subqueries.size == 2, "Two ScalarSubquery are expected in the plan") - - if (reuse) { - assert(subqueries.distinct.size == 1, "Only one ScalarSubquery exists in the plan") - } else { - assert(subqueries.distinct.size == 2, "Reuse is not expected") - } - } - } - } - test("cartesian product join") { withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { checkAnswer(