diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 09c34b7059fc33b94d5169c82fb725b27c6ceb27..4802e4059580750287b3fa383bcf1983efb1ac2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -46,12 +46,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. val rdd = child.execute().mapPartitions { iter => - @transient val hashExpressions = - newMutableProjection(expressions, child.output)() - if (sortBasedShuffleOn) { + @transient val hashExpressions = + newProjection(expressions, child.output) + iter.map(r => (hashExpressions(r), r.copy())) } else { + @transient val hashExpressions = + newMutableProjection(expressions, child.output)() + val mutablePair = new MutablePair[Row, Row]() iter.map(r => mutablePair.update(hashExpressions(r), r)) }