diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7e3e45e56e90a188b2bedaded3b42882a5f15d20..9610506e138ff19d1c7a3e02701058bcb732dbaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -210,7 +210,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case PhysicalAggregation( namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) => - aggregate.Utils.planStreamingAggregation( + aggregate.AggUtils.planStreamingAggregation( namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, @@ -243,20 +243,20 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { sys.error("Distinct columns cannot exist in Aggregate operator containing " + "aggregate functions which don't support partial aggregation.") } else { - aggregate.Utils.planAggregateWithoutPartial( + aggregate.AggUtils.planAggregateWithoutPartial( groupingExpressions, aggregateExpressions, resultExpressions, planLater(child)) } } else if (functionsWithDistinct.isEmpty) { - aggregate.Utils.planAggregateWithoutDistinct( + aggregate.AggUtils.planAggregateWithoutDistinct( groupingExpressions, aggregateExpressions, resultExpressions, planLater(child)) } else { - aggregate.Utils.planAggregateWithOneDistinct( + aggregate.AggUtils.planAggregateWithOneDistinct( groupingExpressions, functionsWithDistinct, functionsWithoutDistinct, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index cd9ba7c75b91dc0985eb91b60300ee273d33ad89..d3e8d4e8e41a2da21a26374229db618cfeb225cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.aggregate.TungstenAggregate +import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf @@ -37,7 +37,7 @@ trait CodegenSupport extends SparkPlan { /** Prefix used in the current operator's variable names. */ private def variablePrefix: String = this match { - case _: TungstenAggregate => "agg" + case _: HashAggregateExec => "agg" case _: BroadcastHashJoinExec => "bhj" case _: SortMergeJoinExec => "smj" case _: RDDScanExec => "rdd" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala similarity index 98% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index d617a048130e8413709998dac94231581a3c940c..a9ec0c8709db78d0cbe6991fab256484ed5bd619 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.streaming.{StateStoreRestoreExec, StateSto /** * Utility functions used by the query planner to convert our plan to new aggregation code path. */ -object Utils { +object AggUtils { def planAggregateWithoutPartial( groupingExpressions: Seq[NamedExpression], @@ -35,7 +35,7 @@ object Utils { val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) val completeAggregateAttributes = completeAggregateExpressions.map(_.resultAttribute) - SortBasedAggregateExec( + SortAggregateExec( requiredChildDistributionExpressions = Some(groupingExpressions), groupingExpressions = groupingExpressions, aggregateExpressions = completeAggregateExpressions, @@ -54,10 +54,10 @@ object Utils { initialInputBufferOffset: Int = 0, resultExpressions: Seq[NamedExpression] = Nil, child: SparkPlan): SparkPlan = { - val usesTungstenAggregate = TungstenAggregate.supportsAggregate( + val useHash = HashAggregateExec.supportsAggregate( aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)) - if (usesTungstenAggregate) { - TungstenAggregate( + if (useHash) { + HashAggregateExec( requiredChildDistributionExpressions = requiredChildDistributionExpressions, groupingExpressions = groupingExpressions, aggregateExpressions = aggregateExpressions, @@ -66,7 +66,7 @@ object Utils { resultExpressions = resultExpressions, child = child) } else { - SortBasedAggregateExec( + SortAggregateExec( requiredChildDistributionExpressions = requiredChildDistributionExpressions, groupingExpressions = groupingExpressions, aggregateExpressions = aggregateExpressions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala similarity index 98% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 091177959bedbc466a194524c71dab261ba98ff5..fad81b558cfcb526a2a01d7ed74a88b12d243190 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -30,7 +30,10 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} import org.apache.spark.unsafe.KVIterator -case class TungstenAggregate( +/** + * Hash-based aggregate operator that can also fallback to sorting when data exceeds memory size. + */ +case class HashAggregateExec( requiredChildDistributionExpressions: Option[Seq[Expression]], groupingExpressions: Seq[NamedExpression], aggregateExpressions: Seq[AggregateExpression], @@ -44,7 +47,7 @@ case class TungstenAggregate( aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) } - require(TungstenAggregate.supportsAggregate(aggregateBufferAttributes)) + require(HashAggregateExec.supportsAggregate(aggregateBufferAttributes)) override lazy val allAttributes: Seq[Attribute] = child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++ @@ -769,15 +772,15 @@ case class TungstenAggregate( val keyString = groupingExpressions.mkString("[", ",", "]") val functionString = allAggregateExpressions.mkString("[", ",", "]") val outputString = output.mkString("[", ",", "]") - s"Aggregate(key=$keyString, functions=$functionString, output=$outputString)" + s"HashAggregate(key=$keyString, functions=$functionString, output=$outputString)" case Some(fallbackStartsAt) => - s"AggregateWithControlledFallback $groupingExpressions " + + s"HashAggregateWithControlledFallback $groupingExpressions " + s"$allAggregateExpressions $resultExpressions fallbackStartsAt=$fallbackStartsAt" } } } -object TungstenAggregate { +object HashAggregateExec { def supportsAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean = { val aggregationBufferSchema = StructType.fromAttributes(aggregateBufferAttributes) UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(aggregationBufferSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala similarity index 98% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index af1fb4c604c81fc39ff1dc7e4a242b7cc849b382..9e48ff8d707bdc8ea49ae1fb30e2e7ba40970935 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -26,7 +26,10 @@ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistrib import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics -case class SortBasedAggregateExec( +/** + * Sort-based aggregate operator. + */ +case class SortAggregateExec( requiredChildDistributionExpressions: Option[Seq[Expression]], groupingExpressions: Seq[NamedExpression], aggregateExpressions: Seq[AggregateExpression], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 0e18ade09cbe5dc5a63e8dde4fa28a1e73ecb3e9..a02e48d849ebfc0ee7c1cec7423ed2753683787b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.Matchers._ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union} import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.aggregate.TungstenAggregate +import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1227,7 +1227,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { private def verifyNonExchangingAgg(df: DataFrame) = { var atFirstAgg: Boolean = false df.queryExecution.executedPlan.foreach { - case agg: TungstenAggregate => + case agg: HashAggregateExec => atFirstAgg = !atFirstAgg case _ => if (atFirstAgg) { @@ -1242,7 +1242,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { private def verifyExchangingAgg(df: DataFrame) = { var atFirstAgg: Boolean = false df.queryExecution.executedPlan.foreach { - case agg: TungstenAggregate => + case agg: HashAggregateExec => if (atFirstAgg) { fail("Should not have back to back Aggregates") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 49a0ba1f1149bb1e2f73f070292f68449a4171f8..1a7f6ebbb29564b9edfcd74453a94d9a885b2ac9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -246,7 +246,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val df = sql(sqlText) // First, check if we have GeneratedAggregate. val hasGeneratedAgg = df.queryExecution.sparkPlan - .collect { case _: aggregate.TungstenAggregate => true } + .collect { case _: aggregate.HashAggregateExec => true } .nonEmpty if (!hasGeneratedAgg) { fail( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index f86955e5a5bc4717bf41ad5813307209a61bbc4d..68f0ee864f47ffbe0c4e46e0f2a8e2990eb63850 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.aggregate.TungstenAggregate +import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions.{avg, broadcast, col, max} @@ -39,7 +39,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val plan = df.queryExecution.executedPlan assert(plan.find(p => p.isInstanceOf[WholeStageCodegenExec] && - p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined) + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) assert(df.collect() === Array(Row(9, 4.5))) } @@ -48,7 +48,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val plan = df.queryExecution.executedPlan assert(plan.find(p => p.isInstanceOf[WholeStageCodegenExec] && - p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined) + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) assert(df.collect() === Array(Row(0, 1), Row(1, 1), Row(2, 1))) } @@ -110,7 +110,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val plan = ds.queryExecution.executedPlan assert(plan.find(p => p.isInstanceOf[WholeStageCodegenExec] && - p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined) + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 7e9160febdec7927bba124a5277a50c284087182..d3639d97355b5b06de42d4cdb04ccbc2c90bd254 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -133,25 +133,21 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { testSparkPlanMetrics(ds.toDF(), 1, Map.empty) } - test("TungstenAggregate metrics") { + test("Aggregate metrics") { // Assume the execution plan is // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1) // -> TungstenAggregate(nodeId = 0) val df = testData2.groupBy().count() // 2 partitions testSparkPlanMetrics(df, 1, Map( - 2L -> ("TungstenAggregate", Map( - "number of output rows" -> 2L)), - 0L -> ("TungstenAggregate", Map( - "number of output rows" -> 1L))) + 2L -> ("HashAggregate", Map("number of output rows" -> 2L)), + 0L -> ("HashAggregate", Map("number of output rows" -> 1L))) ) // 2 partitions and each partition contains 2 keys val df2 = testData2.groupBy('a).count() testSparkPlanMetrics(df2, 1, Map( - 2L -> ("TungstenAggregate", Map( - "number of output rows" -> 4L)), - 0L -> ("TungstenAggregate", Map( - "number of output rows" -> 3L))) + 2L -> ("HashAggregate", Map("number of output rows" -> 4L)), + 0L -> ("HashAggregate", Map("number of output rows" -> 3L))) ) }