diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index cd521c52d1b2100e5fc8184baf69d74fc188e5dd..8fea46a58e85770b2756b195cd6b6a632509bf52 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -63,8 +63,6 @@ public final class UnsafeFixedWidthAggregationMap {
    */
   private final UnsafeRow currentAggregationBuffer;
 
-  private final boolean enablePerfMetrics;
-
   /**
    * @return true if UnsafeFixedWidthAggregationMap supports aggregation buffers with the given
    *         schema, false otherwise.
@@ -87,7 +85,6 @@ public final class UnsafeFixedWidthAggregationMap {
    * @param taskMemoryManager the memory manager used to allocate our Unsafe memory structures.
    * @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing).
    * @param pageSizeBytes the data page size, in bytes; limits the maximum record size.
-   * @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
    */
   public UnsafeFixedWidthAggregationMap(
       InternalRow emptyAggregationBuffer,
@@ -95,15 +92,13 @@ public final class UnsafeFixedWidthAggregationMap {
       StructType groupingKeySchema,
       TaskMemoryManager taskMemoryManager,
       int initialCapacity,
-      long pageSizeBytes,
-      boolean enablePerfMetrics) {
+      long pageSizeBytes) {
     this.aggregationBufferSchema = aggregationBufferSchema;
     this.currentAggregationBuffer = new UnsafeRow(aggregationBufferSchema.length());
     this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema);
     this.groupingKeySchema = groupingKeySchema;
     this.map =
-      new BytesToBytesMap(taskMemoryManager, initialCapacity, pageSizeBytes, enablePerfMetrics);
-    this.enablePerfMetrics = enablePerfMetrics;
+      new BytesToBytesMap(taskMemoryManager, initialCapacity, pageSizeBytes, true);
 
     // Initialize the buffer for aggregation value
     final UnsafeProjection valueProjection = UnsafeProjection.create(aggregationBufferSchema);
@@ -223,15 +218,11 @@ public final class UnsafeFixedWidthAggregationMap {
     map.free();
   }
 
-  @SuppressWarnings("UseOfSystemOutOrSystemErr")
-  public void printPerfMetrics() {
-    if (!enablePerfMetrics) {
-      throw new IllegalStateException("Perf metrics not enabled");
-    }
-    System.out.println("Average probes per lookup: " + map.getAverageProbesPerLookup());
-    System.out.println("Number of hash collisions: " + map.getNumHashCollisions());
-    System.out.println("Time spent resizing (ns): " + map.getTimeSpentResizingNs());
-    System.out.println("Total memory consumption (bytes): " + map.getTotalMemoryConsumption());
+  /**
+   * Gets the average hash map probe per looking up for the underlying `BytesToBytesMap`.
+   */
+  public double getAverageProbesPerLookup() {
+    return map.getAverageProbesPerLookup();
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 68c8e6ce62cbb9fa9cecdab2b4c1a13882115235..9df5e58f70addcb5754edeb05db62186599a21e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -59,7 +59,8 @@ case class HashAggregateExec(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
     "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
-    "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"))
+    "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"),
+    "avgHashmapProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hashmap probe"))
 
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
 
@@ -93,6 +94,7 @@ case class HashAggregateExec(
     val numOutputRows = longMetric("numOutputRows")
     val peakMemory = longMetric("peakMemory")
     val spillSize = longMetric("spillSize")
+    val avgHashmapProbe = longMetric("avgHashmapProbe")
 
     child.execute().mapPartitions { iter =>
 
@@ -116,7 +118,8 @@ case class HashAggregateExec(
             testFallbackStartsAt,
             numOutputRows,
             peakMemory,
-            spillSize)
+            spillSize,
+            avgHashmapProbe)
         if (!hasInput && groupingExpressions.isEmpty) {
           numOutputRows += 1
           Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
@@ -157,7 +160,7 @@ case class HashAggregateExec(
     }
   }
 
-  // The variables used as aggregation buffer
+  // The variables used as aggregation buffer. Only used for aggregation without keys.
   private var bufVars: Seq[ExprCode] = _
 
   private def doProduceWithoutKeys(ctx: CodegenContext): String = {
@@ -312,8 +315,7 @@ case class HashAggregateExec(
       groupingKeySchema,
       TaskContext.get().taskMemoryManager(),
       1024 * 16, // initial capacity
-      TaskContext.get().taskMemoryManager().pageSizeBytes,
-      false // disable tracking of performance metrics
+      TaskContext.get().taskMemoryManager().pageSizeBytes
     )
   }
 
@@ -341,7 +343,8 @@ case class HashAggregateExec(
       hashMap: UnsafeFixedWidthAggregationMap,
       sorter: UnsafeKVExternalSorter,
       peakMemory: SQLMetric,
-      spillSize: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = {
+      spillSize: SQLMetric,
+      avgHashmapProbe: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = {
 
     // update peak execution memory
     val mapMemory = hashMap.getPeakMemoryUsedBytes
@@ -351,6 +354,10 @@ case class HashAggregateExec(
     peakMemory.add(maxMemory)
     metrics.incPeakExecutionMemory(maxMemory)
 
+    // Update average hashmap probe
+    val avgProbes = hashMap.getAverageProbesPerLookup()
+    avgHashmapProbe.add(avgProbes.ceil.toLong)
+
     if (sorter == null) {
       // not spilled
       return hashMap.iterator()
@@ -577,6 +584,7 @@ case class HashAggregateExec(
     val doAgg = ctx.freshName("doAggregateWithKeys")
     val peakMemory = metricTerm(ctx, "peakMemory")
     val spillSize = metricTerm(ctx, "spillSize")
+    val avgHashmapProbe = metricTerm(ctx, "avgHashmapProbe")
 
     def generateGenerateCode(): String = {
       if (isFastHashMapEnabled) {
@@ -602,7 +610,8 @@ case class HashAggregateExec(
           ${if (isFastHashMapEnabled) {
               s"$iterTermForFastHashMap = $fastHashMapTerm.rowIterator();"} else ""}
 
-          $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize);
+          $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize,
+            $avgHashmapProbe);
         }
        """)
 
@@ -792,6 +801,8 @@ case class HashAggregateExec(
          |     $unsafeRowBuffer =
          |       $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value});
          |   }
+         |   // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
+         |   // aggregation after processing all input rows.
          |   if ($unsafeRowBuffer == null) {
          |     if ($sorterTerm == null) {
          |       $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
@@ -800,7 +811,7 @@ case class HashAggregateExec(
          |     }
          |     $resetCounter
          |     // the hash map had be spilled, it should have enough memory now,
-         |     // try  to allocate buffer again.
+         |     // try to allocate buffer again.
          |     $unsafeRowBuffer =
          |       $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value});
          |     if ($unsafeRowBuffer == null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 2988161ee5e7b70d438e3fe06233fc1ac200057e..8efa95d48aea081e2040e1250cff18424a72f095 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -88,7 +88,8 @@ class TungstenAggregationIterator(
     testFallbackStartsAt: Option[(Int, Int)],
     numOutputRows: SQLMetric,
     peakMemory: SQLMetric,
-    spillSize: SQLMetric)
+    spillSize: SQLMetric,
+    avgHashmapProbe: SQLMetric)
   extends AggregationIterator(
     groupingExpressions,
     originalInputAttributes,
@@ -162,8 +163,7 @@ class TungstenAggregationIterator(
     StructType.fromAttributes(groupingExpressions.map(_.toAttribute)),
     TaskContext.get().taskMemoryManager(),
     1024 * 16, // initial capacity
-    TaskContext.get().taskMemoryManager().pageSizeBytes,
-    false // disable tracking of performance metrics
+    TaskContext.get().taskMemoryManager().pageSizeBytes
   )
 
   // The function used to read and process input rows. When processing input rows,
@@ -420,6 +420,10 @@ class TungstenAggregationIterator(
         peakMemory += maxMemory
         spillSize += metrics.memoryBytesSpilled - spillSizeBefore
         metrics.incPeakExecutionMemory(maxMemory)
+
+        // Update average hashmap probe if this is the last record.
+        val averageProbes = hashMap.getAverageProbesPerLookup()
+        avgHashmapProbe.add(averageProbes.ceil.toLong)
       }
       numOutputRows += 1
       res
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index ef982a4ebd10d14461b77bb0aac97e02d7c12c1c..49cab04de2bf09bf97acf272a9998ae7dd34a885 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -68,11 +68,11 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
   }
 }
 
-
 object SQLMetrics {
   private val SUM_METRIC = "sum"
   private val SIZE_METRIC = "size"
   private val TIMING_METRIC = "timing"
+  private val AVERAGE_METRIC = "average"
 
   def createMetric(sc: SparkContext, name: String): SQLMetric = {
     val acc = new SQLMetric(SUM_METRIC)
@@ -102,6 +102,22 @@ object SQLMetrics {
     acc
   }
 
+  /**
+   * Create a metric to report the average information (including min, med, max) like
+   * avg hashmap probe. Because `SQLMetric` stores long values, we take the ceil of the average
+   * values before storing them. This metric is used to record an average value computed in the
+   * end of a task. It should be set once. The initial values (zeros) of this metrics will be
+   * excluded after.
+   */
+  def createAverageMetric(sc: SparkContext, name: String): SQLMetric = {
+    // The final result of this metric in physical operator UI may looks like:
+    // probe avg (min, med, max):
+    // (1, 2, 6)
+    val acc = new SQLMetric(AVERAGE_METRIC)
+    acc.register(sc, name = Some(s"$name (min, med, max)"), countFailedValues = false)
+    acc
+  }
+
   /**
    * A function that defines how we aggregate the final accumulator results among all tasks,
    * and represent it in string for a SQL physical operator.
@@ -110,6 +126,20 @@ object SQLMetrics {
     if (metricsType == SUM_METRIC) {
       val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
       numberFormat.format(values.sum)
+    } else if (metricsType == AVERAGE_METRIC) {
+      val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
+
+      val validValues = values.filter(_ > 0)
+      val Seq(min, med, max) = {
+        val metric = if (validValues.isEmpty) {
+          Seq.fill(3)(0L)
+        } else {
+          val sorted = validValues.sorted
+          Seq(sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
+        }
+        metric.map(numberFormat.format)
+      }
+      s"\n($min, $med, $max)"
     } else {
       val strFormat: Long => String = if (metricsType == SIZE_METRIC) {
         Utils.bytesToString
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index 6cf18de0cc76846d96c9db3675f75ae967c8ae4c..50d8e3024598dd8c713ac7b9ec4089b183587af6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -111,8 +111,7 @@ class UnsafeFixedWidthAggregationMapSuite
       groupKeySchema,
       taskMemoryManager,
       1024, // initial capacity,
-      PAGE_SIZE_BYTES,
-      false // disable perf metrics
+      PAGE_SIZE_BYTES
     )
     assert(!map.iterator().next())
     map.free()
@@ -125,8 +124,7 @@ class UnsafeFixedWidthAggregationMapSuite
       groupKeySchema,
       taskMemoryManager,
       1024, // initial capacity
-      PAGE_SIZE_BYTES,
-      false // disable perf metrics
+      PAGE_SIZE_BYTES
     )
     val groupKey = InternalRow(UTF8String.fromString("cats"))
 
@@ -152,8 +150,7 @@ class UnsafeFixedWidthAggregationMapSuite
       groupKeySchema,
       taskMemoryManager,
       128, // initial capacity
-      PAGE_SIZE_BYTES,
-      false // disable perf metrics
+      PAGE_SIZE_BYTES
     )
     val rand = new Random(42)
     val groupKeys: Set[String] = Seq.fill(512)(rand.nextString(1024)).toSet
@@ -178,8 +175,7 @@ class UnsafeFixedWidthAggregationMapSuite
       groupKeySchema,
       taskMemoryManager,
       128, // initial capacity
-      PAGE_SIZE_BYTES,
-      false // disable perf metrics
+      PAGE_SIZE_BYTES
     )
 
     val keys = randomStrings(1024).take(512)
@@ -226,8 +222,7 @@ class UnsafeFixedWidthAggregationMapSuite
       groupKeySchema,
       taskMemoryManager,
       128, // initial capacity
-      PAGE_SIZE_BYTES,
-      false // disable perf metrics
+      PAGE_SIZE_BYTES
     )
     val sorter = map.destructAndCreateExternalSorter()
 
@@ -267,8 +262,7 @@ class UnsafeFixedWidthAggregationMapSuite
       StructType(Nil),
       taskMemoryManager,
       128, // initial capacity
-      PAGE_SIZE_BYTES,
-      false // disable perf metrics
+      PAGE_SIZE_BYTES
     )
     (1 to 10).foreach { i =>
       val buf = map.getAggregationBuffer(UnsafeRow.createFromByteArray(0, 0))
@@ -312,8 +306,7 @@ class UnsafeFixedWidthAggregationMapSuite
       groupKeySchema,
       taskMemoryManager,
       128, // initial capacity
-      pageSize,
-      false // disable perf metrics
+      pageSize
     )
 
     val rand = new Random(42)
@@ -350,8 +343,7 @@ class UnsafeFixedWidthAggregationMapSuite
       groupKeySchema,
       taskMemoryManager,
       128, // initial capacity
-      pageSize,
-      false // disable perf metrics
+      pageSize
     )
 
     val rand = new Random(42)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index a4e62f1d16792686c4209006f1a9c3aea9c14571..a12ce2b9eba34f111d8cbcb5027376f2021a8c75 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.metric
 import java.io.File
 
 import scala.collection.mutable.HashMap
+import scala.util.Random
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
@@ -35,18 +36,18 @@ import org.apache.spark.util.{AccumulatorContext, JsonProtocol}
 class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
   import testImplicits._
 
+
   /**
-   * Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics".
+   * Call `df.collect()` and collect necessary metrics from execution data.
    *
    * @param df `DataFrame` to run
    * @param expectedNumOfJobs number of jobs that will run
-   * @param expectedMetrics the expected metrics. The format is
-   *                        `nodeId -> (operatorName, metric name -> metric value)`.
+   * @param expectedNodeIds the node ids of the metrics to collect from execution data.
    */
-  private def testSparkPlanMetrics(
+  private def getSparkPlanMetrics(
       df: DataFrame,
       expectedNumOfJobs: Int,
-      expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
+      expectedNodeIds: Set[Long]): Option[Map[Long, (String, Map[String, Any])]] = {
     val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet
     withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
       df.collect()
@@ -63,9 +64,9 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     if (jobs.size == expectedNumOfJobs) {
       // If we can track all jobs, check the metric values
       val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId)
-      val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
+      val metrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
         df.queryExecution.executedPlan)).allNodes.filter { node =>
-        expectedMetrics.contains(node.id)
+        expectedNodeIds.contains(node.id)
       }.map { node =>
         val nodeMetrics = node.metrics.map { metric =>
           val metricValue = metricValues(metric.accumulatorId)
@@ -73,7 +74,30 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
         }.toMap
         (node.id, node.name -> nodeMetrics)
       }.toMap
+      Some(metrics)
+    } else {
+      // TODO Remove this "else" once we fix the race condition that missing the JobStarted event.
+      // Since we cannot track all jobs, the metric values could be wrong and we should not check
+      // them.
+      logWarning("Due to a race condition, we miss some jobs and cannot verify the metric values")
+      None
+    }
+  }
 
+  /**
+   * Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics".
+   *
+   * @param df `DataFrame` to run
+   * @param expectedNumOfJobs number of jobs that will run
+   * @param expectedMetrics the expected metrics. The format is
+   *                        `nodeId -> (operatorName, metric name -> metric value)`.
+   */
+  private def testSparkPlanMetrics(
+      df: DataFrame,
+      expectedNumOfJobs: Int,
+      expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
+    val optActualMetrics = getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetrics.keySet)
+    optActualMetrics.map { actualMetrics =>
       assert(expectedMetrics.keySet === actualMetrics.keySet)
       for (nodeId <- expectedMetrics.keySet) {
         val (expectedNodeName, expectedMetricsMap) = expectedMetrics(nodeId)
@@ -83,11 +107,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
           assert(expectedMetricsMap(metricName).toString === actualMetricsMap(metricName))
         }
       }
-    } else {
-      // TODO Remove this "else" once we fix the race condition that missing the JobStarted event.
-      // Since we cannot track all jobs, the metric values could be wrong and we should not check
-      // them.
-      logWarning("Due to a race condition, we miss some jobs and cannot verify the metric values")
     }
   }
 
@@ -130,19 +149,47 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     // ... -> HashAggregate(nodeId = 2) -> Exchange(nodeId = 1)
     // -> HashAggregate(nodeId = 0)
     val df = testData2.groupBy().count() // 2 partitions
+    val expected1 = Seq(
+      Map("number of output rows" -> 2L,
+        "avg hashmap probe (min, med, max)" -> "\n(1, 1, 1)"),
+      Map("number of output rows" -> 1L,
+        "avg hashmap probe (min, med, max)" -> "\n(1, 1, 1)"))
     testSparkPlanMetrics(df, 1, Map(
-      2L -> ("HashAggregate", Map("number of output rows" -> 2L)),
-      0L -> ("HashAggregate", Map("number of output rows" -> 1L)))
+      2L -> ("HashAggregate", expected1(0)),
+      0L -> ("HashAggregate", expected1(1)))
     )
 
     // 2 partitions and each partition contains 2 keys
     val df2 = testData2.groupBy('a).count()
+    val expected2 = Seq(
+      Map("number of output rows" -> 4L,
+        "avg hashmap probe (min, med, max)" -> "\n(1, 1, 1)"),
+      Map("number of output rows" -> 3L,
+        "avg hashmap probe (min, med, max)" -> "\n(1, 1, 1)"))
     testSparkPlanMetrics(df2, 1, Map(
-      2L -> ("HashAggregate", Map("number of output rows" -> 4L)),
-      0L -> ("HashAggregate", Map("number of output rows" -> 3L)))
+      2L -> ("HashAggregate", expected2(0)),
+      0L -> ("HashAggregate", expected2(1)))
     )
   }
 
+  test("Aggregate metrics: track avg probe") {
+    val random = new Random()
+    val manyBytes = (0 until 65535).map { _ =>
+      val byteArrSize = random.nextInt(100)
+      val bytes = new Array[Byte](byteArrSize)
+      random.nextBytes(bytes)
+      (bytes, random.nextInt(100))
+    }
+    val df = manyBytes.toSeq.toDF("a", "b").repartition(1).groupBy('a).count()
+    val metrics = getSparkPlanMetrics(df, 1, Set(2L, 0L)).get
+    Seq(metrics(2L)._2("avg hashmap probe (min, med, max)"),
+        metrics(0L)._2("avg hashmap probe (min, med, max)")).foreach { probes =>
+      probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach { probe =>
+        assert(probe.toInt > 1)
+      }
+    }
+  }
+
   test("ObjectHashAggregate metrics") {
     // Assume the execution plan is
     // ... -> ObjectHashAggregate(nodeId = 2) -> Exchange(nodeId = 1)