From d74dee1336e7152cc0fb7d2b3bf1a44f4f452025 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Thu, 28 Sep 2017 09:20:37 -0700
Subject: [PATCH] [SPARK-22153][SQL] Rename ShuffleExchange ->
 ShuffleExchangeExec

## What changes were proposed in this pull request?
For some reason when we added the Exec suffix to all physical operators, we missed this one. I was looking for this physical operator today and couldn't find it, because I was looking for ExchangeExec.

## How was this patch tested?
This is a simple rename and should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #19376 from rxin/SPARK-22153.
---
 .../spark/sql/execution/SparkStrategies.scala |  6 +--
 .../exchange/EnsureRequirements.scala         | 26 ++++++-------
 .../exchange/ExchangeCoordinator.scala        | 38 +++++++++----------
 ...change.scala => ShuffleExchangeExec.scala} | 10 ++---
 .../apache/spark/sql/execution/limit.scala    |  6 +--
 .../streaming/IncrementalExecution.scala      |  4 +-
 .../apache/spark/sql/CachedTableSuite.scala   |  5 ++-
 .../org/apache/spark/sql/DataFrameSuite.scala | 10 ++---
 .../org/apache/spark/sql/DatasetSuite.scala   |  4 +-
 .../execution/ExchangeCoordinatorSuite.scala  | 22 +++++------
 .../spark/sql/execution/ExchangeSuite.scala   | 12 +++---
 .../spark/sql/execution/PlannerSuite.scala    | 32 ++++++++--------
 .../spark/sql/sources/BucketedReadSuite.scala | 10 ++---
 .../EnsureStatefulOpPartitioningSuite.scala   |  4 +-
 14 files changed, 95 insertions(+), 94 deletions(-)
 rename sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/{ShuffleExchange.scala => ShuffleExchangeExec.scala} (98%)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4da7a73469..92eaab5cd8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
@@ -411,7 +411,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
       case logical.Repartition(numPartitions, shuffle, child) =>
         if (shuffle) {
-          ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
+          ShuffleExchangeExec(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
         } else {
           execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
         }
@@ -446,7 +446,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case r: logical.Range =>
         execution.RangeExec(r) :: Nil
       case logical.RepartitionByExpression(expressions, child, numPartitions) =>
-        exchange.ShuffleExchange(HashPartitioning(
+        exchange.ShuffleExchangeExec(HashPartitioning(
           expressions, numPartitions), planLater(child)) :: Nil
       case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
       case r: LogicalRDD =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 1da72f2e92..d28ce60e27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql.internal.SQLConf
  * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
  * of input data meets the
  * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
- * each operator by inserting [[ShuffleExchange]] Operators where required.  Also ensure that the
- * input partition ordering requirements are met.
+ * each operator by inserting [[ShuffleExchangeExec]] Operators where required.  Also ensure that
+ * the input partition ordering requirements are met.
  */
 case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
   private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
@@ -57,17 +57,17 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
   }
 
   /**
-   * Adds [[ExchangeCoordinator]] to [[ShuffleExchange]]s if adaptive query execution is enabled
-   * and partitioning schemes of these [[ShuffleExchange]]s support [[ExchangeCoordinator]].
+   * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled
+   * and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]].
    */
   private def withExchangeCoordinator(
       children: Seq[SparkPlan],
       requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
     val supportsCoordinator =
-      if (children.exists(_.isInstanceOf[ShuffleExchange])) {
+      if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
         // Right now, ExchangeCoordinator only support HashPartitionings.
         children.forall {
-          case e @ ShuffleExchange(hash: HashPartitioning, _, _) => true
+          case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
           case child =>
             child.outputPartitioning match {
               case hash: HashPartitioning => true
@@ -94,7 +94,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
             targetPostShuffleInputSize,
             minNumPostShufflePartitions)
         children.zip(requiredChildDistributions).map {
-          case (e: ShuffleExchange, _) =>
+          case (e: ShuffleExchangeExec, _) =>
             // This child is an Exchange, we need to add the coordinator.
             e.copy(coordinator = Some(coordinator))
           case (child, distribution) =>
@@ -138,7 +138,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
             val targetPartitioning =
               createPartitioning(distribution, defaultNumPreShufflePartitions)
             assert(targetPartitioning.isInstanceOf[HashPartitioning])
-            ShuffleExchange(targetPartitioning, child, Some(coordinator))
+            ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
         }
       } else {
         // If we do not need ExchangeCoordinator, the original children are returned.
@@ -162,7 +162,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
       case (child, BroadcastDistribution(mode)) =>
         BroadcastExchangeExec(mode, child)
       case (child, distribution) =>
-        ShuffleExchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
+        ShuffleExchangeExec(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
     }
 
     // If the operator has multiple children and specifies child output distributions (e.g. join),
@@ -215,8 +215,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
               child match {
                 // If child is an exchange, we replace it with
                 // a new one having targetPartitioning.
-                case ShuffleExchange(_, c, _) => ShuffleExchange(targetPartitioning, c)
-                case _ => ShuffleExchange(targetPartitioning, child)
+                case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(targetPartitioning, c)
+                case _ => ShuffleExchangeExec(targetPartitioning, child)
               }
           }
         }
@@ -246,9 +246,9 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
   }
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case operator @ ShuffleExchange(partitioning, child, _) =>
+    case operator @ ShuffleExchangeExec(partitioning, child, _) =>
       child.children match {
-        case ShuffleExchange(childPartitioning, baseChild, _)::Nil =>
+        case ShuffleExchangeExec(childPartitioning, baseChild, _)::Nil =>
           if (childPartitioning.guarantees(partitioning)) child else operator
         case _ => operator
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
index 9fc4ffb651..78f11ca8d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
@@ -35,9 +35,9 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
  *
  * A coordinator is constructed with three parameters, `numExchanges`,
  * `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`.
- *  - `numExchanges` is used to indicated that how many [[ShuffleExchange]]s that will be registered
- *    to this coordinator. So, when we start to do any actual work, we have a way to make sure that
- *    we have got expected number of [[ShuffleExchange]]s.
+ *  - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec]]s that will be
+ *    registered to this coordinator. So, when we start to do any actual work, we have a way to
+ *    make sure that we have got expected number of [[ShuffleExchangeExec]]s.
  *  - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's
  *    input data size. With this parameter, we can estimate the number of post-shuffle partitions.
  *    This parameter is configured through
@@ -47,28 +47,28 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
  *    partitions.
  *
  * The workflow of this coordinator is described as follows:
- *  - Before the execution of a [[SparkPlan]], for a [[ShuffleExchange]] operator,
+ *  - Before the execution of a [[SparkPlan]], for a [[ShuffleExchangeExec]] operator,
  *    if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator.
  *    This happens in the `doPrepare` method.
- *  - Once we start to execute a physical plan, a [[ShuffleExchange]] registered to this
+ *  - Once we start to execute a physical plan, a [[ShuffleExchangeExec]] registered to this
  *    coordinator will call `postShuffleRDD` to get its corresponding post-shuffle
  *    [[ShuffledRowRDD]].
- *    If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchange]]
+ *    If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec]]
  *    will immediately get its corresponding post-shuffle [[ShuffledRowRDD]].
  *  - If this coordinator has not made the decision on how to shuffle data, it will ask those
- *    registered [[ShuffleExchange]]s to submit their pre-shuffle stages. Then, based on the
+ *    registered [[ShuffleExchangeExec]]s to submit their pre-shuffle stages. Then, based on the
  *    size statistics of pre-shuffle partitions, this coordinator will determine the number of
  *    post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices
  *    to a single post-shuffle partition whenever necessary.
  *  - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered
- *    [[ShuffleExchange]]s. So, when a [[ShuffleExchange]] calls `postShuffleRDD`, this coordinator
- *    can lookup the corresponding [[RDD]].
+ *    [[ShuffleExchangeExec]]s. So, when a [[ShuffleExchangeExec]] calls `postShuffleRDD`, this
+ *    coordinator can lookup the corresponding [[RDD]].
  *
  * The strategy used to determine the number of post-shuffle partitions is described as follows.
  * To determine the number of post-shuffle partitions, we have a target input size for a
  * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages
- * corresponding to the registered [[ShuffleExchange]]s, we will do a pass of those statistics and
- * pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
+ * corresponding to the registered [[ShuffleExchangeExec]]s, we will do a pass of those statistics
+ * and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
  * adding another pre-shuffle partition would cause the size of a post-shuffle partition to be
  * greater than the target size.
  *
@@ -89,11 +89,11 @@ class ExchangeCoordinator(
   extends Logging {
 
   // The registered Exchange operators.
-  private[this] val exchanges = ArrayBuffer[ShuffleExchange]()
+  private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]()
 
   // This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator.
-  private[this] val postShuffleRDDs: JMap[ShuffleExchange, ShuffledRowRDD] =
-    new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges)
+  private[this] val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] =
+    new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
 
   // A boolean that indicates if this coordinator has made decision on how to shuffle data.
   // This variable will only be updated by doEstimationIfNecessary, which is protected by
@@ -101,11 +101,11 @@ class ExchangeCoordinator(
   @volatile private[this] var estimated: Boolean = false
 
   /**
-   * Registers a [[ShuffleExchange]] operator to this coordinator. This method is only allowed to
-   * be called in the `doPrepare` method of a [[ShuffleExchange]] operator.
+   * Registers a [[ShuffleExchangeExec]] operator to this coordinator. This method is only allowed
+   * to be called in the `doPrepare` method of a [[ShuffleExchangeExec]] operator.
    */
   @GuardedBy("this")
-  def registerExchange(exchange: ShuffleExchange): Unit = synchronized {
+  def registerExchange(exchange: ShuffleExchangeExec): Unit = synchronized {
     exchanges += exchange
   }
 
@@ -200,7 +200,7 @@ class ExchangeCoordinator(
       // Make sure we have the expected number of registered Exchange operators.
       assert(exchanges.length == numExchanges)
 
-      val newPostShuffleRDDs = new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges)
+      val newPostShuffleRDDs = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
 
       // Submit all map stages
       val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]()
@@ -255,7 +255,7 @@ class ExchangeCoordinator(
     }
   }
 
-  def postShuffleRDD(exchange: ShuffleExchange): ShuffledRowRDD = {
+  def postShuffleRDD(exchange: ShuffleExchangeExec): ShuffledRowRDD = {
     doEstimationIfNecessary()
 
     if (!postShuffleRDDs.containsKey(exchange)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
similarity index 98%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 0d06d83fb2..11c4aa9b4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.MutablePair
 /**
  * Performs a shuffle that will result in the desired `newPartitioning`.
  */
-case class ShuffleExchange(
+case class ShuffleExchangeExec(
     var newPartitioning: Partitioning,
     child: SparkPlan,
     @transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
@@ -84,7 +84,7 @@ case class ShuffleExchange(
    */
   private[exchange] def prepareShuffleDependency()
     : ShuffleDependency[Int, InternalRow, InternalRow] = {
-    ShuffleExchange.prepareShuffleDependency(
+    ShuffleExchangeExec.prepareShuffleDependency(
       child.execute(), child.output, newPartitioning, serializer)
   }
 
@@ -129,9 +129,9 @@ case class ShuffleExchange(
   }
 }
 
-object ShuffleExchange {
-  def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchange = {
-    ShuffleExchange(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator])
+object ShuffleExchangeExec {
+  def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchangeExec = {
+    ShuffleExchangeExec(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator])
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 1f515e29b4..13da4b26a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering}
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.util.Utils
 
 /**
@@ -40,7 +40,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
   protected override def doExecute(): RDD[InternalRow] = {
     val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
     val shuffled = new ShuffledRowRDD(
-      ShuffleExchange.prepareShuffleDependency(
+      ShuffleExchangeExec.prepareShuffleDependency(
         locallyLimited, child.output, SinglePartition, serializer))
     shuffled.mapPartitionsInternal(_.take(limit))
   }
@@ -153,7 +153,7 @@ case class TakeOrderedAndProjectExec(
       }
     }
     val shuffled = new ShuffledRowRDD(
-      ShuffleExchange.prepareShuffleDependency(
+      ShuffleExchangeExec.prepareShuffleDependency(
         localTopK, child.output, SinglePartition, serializer))
     shuffled.mapPartitions { iter =>
       val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 8e0aae39ca..82f879c763 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.streaming.OutputMode
 
 /**
@@ -155,7 +155,7 @@ object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
             child.execute().getNumPartitions == expectedPartitioning.numPartitions) {
           child
         } else {
-          ShuffleExchange(expectedPartitioning, child)
+          ShuffleExchangeExec(expectedPartitioning, child)
         }
       }
       so.withNewChildren(children)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 3e4f619431..1e52445f28 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
 import org.apache.spark.sql.execution.columnar._
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
@@ -420,7 +420,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
    * Verifies that the plan for `df` contains `expected` number of Exchange operators.
    */
   private def verifyNumExchanges(df: DataFrame, expected: Int): Unit = {
-    assert(df.queryExecution.executedPlan.collect { case e: ShuffleExchange => e }.size == expected)
+    assert(
+      df.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => e }.size == expected)
   }
 
   test("A cached table preserves the partitioning and ordering of its cached SparkPlan") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 6178661cf7..0e2f2e5a19 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union}
 import org.apache.spark.sql.execution.{FilterExec, QueryExecution}
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext}
@@ -1529,7 +1529,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
           fail("Should not have back to back Aggregates")
         }
         atFirstAgg = true
-      case e: ShuffleExchange => atFirstAgg = false
+      case e: ShuffleExchangeExec => atFirstAgg = false
       case _ =>
     }
   }
@@ -1710,19 +1710,19 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       val plan = join.queryExecution.executedPlan
       checkAnswer(join, df)
       assert(
-        join.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1)
+        join.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => true }.size === 1)
       assert(
         join.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 1)
       val broadcasted = broadcast(join)
       val join2 = join.join(broadcasted, "id").join(broadcasted, "id")
       checkAnswer(join2, df)
       assert(
-        join2.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1)
+        join2.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => true }.size == 1)
       assert(
         join2.queryExecution.executedPlan
           .collect { case e: BroadcastExchangeExec => true }.size === 1)
       assert(
-        join2.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 4)
+        join2.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size == 4)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 5015f3709f..dace6825ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
 import org.apache.spark.sql.catalyst.util.sideBySide
 import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec}
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -1206,7 +1206,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
       val agg = cp.groupBy('id % 2).agg(count('id))
 
       agg.queryExecution.executedPlan.collectFirst {
-        case ShuffleExchange(_, _: RDDScanExec, _) =>
+        case ShuffleExchangeExec(_, _: RDDScanExec, _) =>
         case BroadcastExchangeExec(_, _: RDDScanExec) =>
       }.foreach { _ =>
         fail(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
index f1b5e3be5b..737eeb0af5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
@@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite}
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ShuffleExchange}
+import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ShuffleExchangeExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 
@@ -300,13 +300,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
         // Then, let's look at the number of post-shuffle partitions estimated
         // by the ExchangeCoordinator.
         val exchanges = agg.queryExecution.executedPlan.collect {
-          case e: ShuffleExchange => e
+          case e: ShuffleExchangeExec => e
         }
         assert(exchanges.length === 1)
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
             exchanges.foreach {
-              case e: ShuffleExchange =>
+              case e: ShuffleExchangeExec =>
                 assert(e.coordinator.isDefined)
                 assert(e.outputPartitioning.numPartitions === 5)
               case o =>
@@ -314,7 +314,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
 
           case None =>
             exchanges.foreach {
-              case e: ShuffleExchange =>
+              case e: ShuffleExchangeExec =>
                 assert(e.coordinator.isDefined)
                 assert(e.outputPartitioning.numPartitions === 3)
               case o =>
@@ -351,13 +351,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
         // Then, let's look at the number of post-shuffle partitions estimated
         // by the ExchangeCoordinator.
         val exchanges = join.queryExecution.executedPlan.collect {
-          case e: ShuffleExchange => e
+          case e: ShuffleExchangeExec => e
         }
         assert(exchanges.length === 2)
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
             exchanges.foreach {
-              case e: ShuffleExchange =>
+              case e: ShuffleExchangeExec =>
                 assert(e.coordinator.isDefined)
                 assert(e.outputPartitioning.numPartitions === 5)
               case o =>
@@ -365,7 +365,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
 
           case None =>
             exchanges.foreach {
-              case e: ShuffleExchange =>
+              case e: ShuffleExchangeExec =>
                 assert(e.coordinator.isDefined)
                 assert(e.outputPartitioning.numPartitions === 2)
               case o =>
@@ -407,13 +407,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
         // Then, let's look at the number of post-shuffle partitions estimated
         // by the ExchangeCoordinator.
         val exchanges = join.queryExecution.executedPlan.collect {
-          case e: ShuffleExchange => e
+          case e: ShuffleExchangeExec => e
         }
         assert(exchanges.length === 4)
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
             exchanges.foreach {
-              case e: ShuffleExchange =>
+              case e: ShuffleExchangeExec =>
                 assert(e.coordinator.isDefined)
                 assert(e.outputPartitioning.numPartitions === 5)
               case o =>
@@ -459,13 +459,13 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
         // Then, let's look at the number of post-shuffle partitions estimated
         // by the ExchangeCoordinator.
         val exchanges = join.queryExecution.executedPlan.collect {
-          case e: ShuffleExchange => e
+          case e: ShuffleExchangeExec => e
         }
         assert(exchanges.length === 3)
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
             exchanges.foreach {
-              case e: ShuffleExchange =>
+              case e: ShuffleExchangeExec =>
                 assert(e.coordinator.isDefined)
                 assert(e.outputPartitioning.numPartitions === 5)
               case o =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
index 59eaf4d1c2..aac8d56ba6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, IdentityBroadcastMode, SinglePartition}
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -31,7 +31,7 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext {
     val input = (1 to 1000).map(Tuple1.apply)
     checkAnswer(
       input.toDF(),
-      plan => ShuffleExchange(SinglePartition, plan),
+      plan => ShuffleExchangeExec(SinglePartition, plan),
       input.map(Row.fromTuple)
     )
   }
@@ -81,12 +81,12 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext {
     assert(plan sameResult plan)
 
     val part1 = HashPartitioning(output, 1)
-    val exchange1 = ShuffleExchange(part1, plan)
-    val exchange2 = ShuffleExchange(part1, plan)
+    val exchange1 = ShuffleExchangeExec(part1, plan)
+    val exchange2 = ShuffleExchangeExec(part1, plan)
     val part2 = HashPartitioning(output, 2)
-    val exchange3 = ShuffleExchange(part2, plan)
+    val exchange3 = ShuffleExchangeExec(part2, plan)
     val part3 = HashPartitioning(output ++ output, 2)
-    val exchange4 = ShuffleExchange(part3, plan)
+    val exchange4 = ShuffleExchangeExec(part3, plan)
     val exchange5 = ReusedExchangeExec(output, exchange4)
 
     assert(exchange1 sameResult exchange1)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 63e17c7f37..86066362da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter,
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchange}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -214,7 +214,7 @@ class PlannerSuite extends SharedSQLContext {
               |  JOIN tiny ON (small.key = tiny.key)
             """.stripMargin
           ).queryExecution.executedPlan.collect {
-            case exchange: ShuffleExchange => exchange
+            case exchange: ShuffleExchangeExec => exchange
           }.length
           assert(numExchanges === 5)
         }
@@ -229,7 +229,7 @@ class PlannerSuite extends SharedSQLContext {
               |  JOIN tiny ON (normal.key = tiny.key)
             """.stripMargin
           ).queryExecution.executedPlan.collect {
-            case exchange: ShuffleExchange => exchange
+            case exchange: ShuffleExchangeExec => exchange
           }.length
           assert(numExchanges === 5)
         }
@@ -300,7 +300,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case e: ShuffleExchange => true }.isEmpty) {
+    if (outputPlan.collect { case e: ShuffleExchangeExec => true }.isEmpty) {
       fail(s"Exchange should have been added:\n$outputPlan")
     }
   }
@@ -338,7 +338,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case e: ShuffleExchange => true }.isEmpty) {
+    if (outputPlan.collect { case e: ShuffleExchangeExec => true }.isEmpty) {
       fail(s"Exchange should have been added:\n$outputPlan")
     }
   }
@@ -358,7 +358,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case e: ShuffleExchange => true }.nonEmpty) {
+    if (outputPlan.collect { case e: ShuffleExchangeExec => true }.nonEmpty) {
       fail(s"Exchange should not have been added:\n$outputPlan")
     }
   }
@@ -381,7 +381,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case e: ShuffleExchange => true }.nonEmpty) {
+    if (outputPlan.collect { case e: ShuffleExchangeExec => true }.nonEmpty) {
       fail(s"No Exchanges should have been added:\n$outputPlan")
     }
   }
@@ -391,7 +391,7 @@ class PlannerSuite extends SharedSQLContext {
     val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 5)
     val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5)
     assert(!childPartitioning.satisfies(distribution))
-    val inputPlan = ShuffleExchange(finalPartitioning,
+    val inputPlan = ShuffleExchangeExec(finalPartitioning,
       DummySparkPlan(
         children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
         requiredChildDistribution = Seq(distribution),
@@ -400,7 +400,7 @@ class PlannerSuite extends SharedSQLContext {
 
     val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case e: ShuffleExchange => true }.size == 2) {
+    if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 2) {
       fail(s"Topmost Exchange should have been eliminated:\n$outputPlan")
     }
   }
@@ -411,7 +411,7 @@ class PlannerSuite extends SharedSQLContext {
     val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 8)
     val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5)
     assert(!childPartitioning.satisfies(distribution))
-    val inputPlan = ShuffleExchange(finalPartitioning,
+    val inputPlan = ShuffleExchangeExec(finalPartitioning,
       DummySparkPlan(
         children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
         requiredChildDistribution = Seq(distribution),
@@ -420,7 +420,7 @@ class PlannerSuite extends SharedSQLContext {
 
     val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case e: ShuffleExchange => true }.size == 1) {
+    if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 1) {
       fail(s"Topmost Exchange should not have been eliminated:\n$outputPlan")
     }
   }
@@ -430,7 +430,7 @@ class PlannerSuite extends SharedSQLContext {
     val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 5)
     val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5)
     assert(!childPartitioning.satisfies(distribution))
-    val shuffle = ShuffleExchange(finalPartitioning,
+    val shuffle = ShuffleExchangeExec(finalPartitioning,
       DummySparkPlan(
         children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
         requiredChildDistribution = Seq(distribution),
@@ -449,7 +449,7 @@ class PlannerSuite extends SharedSQLContext {
     if (outputPlan.collect { case e: ReusedExchangeExec => true }.size != 1) {
       fail(s"Should re-use the shuffle:\n$outputPlan")
     }
-    if (outputPlan.collect { case e: ShuffleExchange => true }.size != 1) {
+    if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size != 1) {
       fail(s"Should have only one shuffle:\n$outputPlan")
     }
 
@@ -459,14 +459,14 @@ class PlannerSuite extends SharedSQLContext {
       Literal(1) :: Nil,
       Inner,
       None,
-      ShuffleExchange(finalPartitioning, inputPlan),
-      ShuffleExchange(finalPartitioning, inputPlan))
+      ShuffleExchangeExec(finalPartitioning, inputPlan),
+      ShuffleExchangeExec(finalPartitioning, inputPlan))
 
     val outputPlan2 = ReuseExchange(spark.sessionState.conf).apply(inputPlan2)
     if (outputPlan2.collect { case e: ReusedExchangeExec => true }.size != 2) {
       fail(s"Should re-use the two shuffles:\n$outputPlan2")
     }
-    if (outputPlan2.collect { case e: ShuffleExchange => true }.size != 2) {
+    if (outputPlan2.collect { case e: ShuffleExchangeExec => true }.size != 2) {
       fail(s"Should have only two shuffles:\n$outputPlan")
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index eb9e6458fc..ab18905e2d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -302,10 +302,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
 
         // check existence of shuffle
         assert(
-          joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft,
+          joinOperator.left.find(_.isInstanceOf[ShuffleExchangeExec]).isDefined == shuffleLeft,
           s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}")
         assert(
-          joinOperator.right.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleRight,
+          joinOperator.right.find(_.isInstanceOf[ShuffleExchangeExec]).isDefined == shuffleRight,
           s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}")
 
         // check existence of sort
@@ -506,7 +506,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
         agged.sort("i", "j"),
         df1.groupBy("i", "j").agg(max("k")).sort("i", "j"))
 
-      assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchange]).isEmpty)
+      assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty)
     }
   }
 
@@ -520,7 +520,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
         agged.sort("i", "j"),
         df1.groupBy("i", "j").agg(max("k")).sort("i", "j"))
 
-      assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchange]).isEmpty)
+      assert(agged.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
index 044bb03480..ed9823fbdd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode}
-import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo}
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -93,7 +93,7 @@ class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with SharedSQLCont
         fail(s"Was expecting an exchange but didn't get one in:\n$executed")
       }
       assert(exchange.get ===
-        ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), inputPlan),
+        ShuffleExchangeExec(expectedPartitioning(inputPlan.output.take(1)), inputPlan),
         s"Exchange didn't have expected properties:\n${exchange.get}")
     } else {
       assert(!executed.children.exists(_.isInstanceOf[Exchange]),
-- 
GitLab