diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c6f5cf641b8d535a552f7712b364efc16c4adb54..1739b0cfa2761417fa5e5e680778b69b2c3bf0eb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -552,6 +552,12 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
+    .internal()
+    .doc("When true, the planner will try to find out duplicated subqueries and re-use them.")
+    .booleanConf
+    .createWithDefault(true)
+
   val STATE_STORE_PROVIDER_CLASS =
     buildConf("spark.sql.streaming.stateStore.providerClass")
       .internal()
@@ -932,6 +938,8 @@ class SQLConf extends Serializable with Logging {
 
   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
 
+  def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
+
   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
   def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 85096dcc40f5d2480d716e1b945f63257b29b5f1..f69a688555bbf3a90160fe14686febcc79d0ad5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -595,6 +595,9 @@ case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends Spa
  */
 case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
 
+  // Ignore this wrapper for canonicalizing.
+  override lazy val canonicalized: SparkPlan = child.canonicalized
+
   override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
     "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index d11045fb6ac8c5c72596d32cd5059e5de3473b8b..2abeadfe4536299ba3c183e90734b109e4a87f41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
 case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = {
-    if (!conf.exchangeReuseEnabled) {
+    if (!conf.subqueryReuseEnabled) {
       return plan
     }
     // Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b525c9e80ba421c79b51d7cfd7080c6eb8c88beb..41e9e2c92ca8ef061c52fb4f8ea0f6dc9c891c97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -23,9 +23,12 @@ import java.net.{MalformedURLException, URL}
 import java.sql.Timestamp
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.{AccumulatorSuite, SparkException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.execution.{ScalarSubquery, SubqueryExec}
 import org.apache.spark.sql.execution.aggregate
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
 import org.apache.spark.sql.functions._
@@ -700,6 +703,38 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
         row => Seq.fill(16)(Row.merge(row, row))).collect().toSeq)
   }
 
+  test("Verify spark.sql.subquery.reuse") {
+    Seq(true, false).foreach { reuse =>
+      withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
+        val df = sql(
+          """
+            |SELECT key, (SELECT avg(key) FROM testData)
+            |FROM testData
+            |WHERE key > (SELECT avg(key) FROM testData)
+            |ORDER BY key
+            |LIMIT 3
+          """.stripMargin)
+
+        checkAnswer(df, Row(51, 50.5) :: Row(52, 50.5) :: Row(53, 50.5) :: Nil)
+
+        val subqueries = ArrayBuffer.empty[SubqueryExec]
+        df.queryExecution.executedPlan.transformAllExpressions {
+          case s @ ScalarSubquery(plan: SubqueryExec, _) =>
+            subqueries += plan
+            s
+        }
+
+        assert(subqueries.size == 2, "Two ScalarSubquery are expected in the plan")
+
+        if (reuse) {
+          assert(subqueries.distinct.size == 1, "Only one ScalarSubquery exists in the plan")
+        } else {
+          assert(subqueries.distinct.size == 2, "Reuse is not expected")
+        }
+      }
+    }
+  }
+
   test("cartesian product join") {
     withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
       checkAnswer(