diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java index 69ce54390fead9008597dc0ff9eb5ecc2bc1d80f..25a565d32638da105456e4a274de267d9274e717 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java @@ -30,7 +30,7 @@ import static org.apache.spark.sql.types.DataTypes.LongType; * This is an illustrative implementation of an append-only single-key/single value aggregate hash * map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates * (and fall back to the `BytesToBytesMap` if a given key isn't found). This can be potentially - * 'codegened' in TungstenAggregate to speed up aggregates w/ key. + * 'codegened' in HashAggregate to speed up aggregates w/ key. * * It is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the * key-value pairs. The index lookups in the array rely on linear probing (with a small number of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index a9ec0c8709db78d0cbe6991fab256484ed5bd619..4fbb9d554c9bf366bea69311b8e7ccbf5b9cbe63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -82,7 +82,7 @@ object AggUtils { aggregateExpressions: Seq[AggregateExpression], resultExpressions: Seq[NamedExpression], child: SparkPlan): Seq[SparkPlan] = { - // Check if we can use TungstenAggregate. + // Check if we can use HashAggregate. // 1. Create an Aggregate Operator for partial aggregations. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala index 81aacb437ba5409569e4c1b9a420093729cddbb1..34de76dd4ab4e204f1e5b232a529581ce6e67214 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala @@ -52,7 +52,7 @@ abstract class AggregationIterator( * - PartialMerge (for single distinct) * - Partial and PartialMerge (for single distinct) * - Final - * - Complete (for SortBasedAggregate with functions that does not support Partial) + * - Complete (for SortAggregate with functions that does not support Partial) * - Final and Complete (currently not used) * * TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index fad81b558cfcb526a2a01d7ed74a88b12d243190..f5bc0628b6458abba40109d34dc7eb857f691ef4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -458,7 +458,7 @@ case class HashAggregateExec( } /** - * Using the vectorized hash map in TungstenAggregate is currently supported for all primitive + * Using the vectorized hash map in HashAggregate is currently supported for all primitive * data types during partial aggregation. However, we currently only enable the hash map for a * subset of cases that've been verified to show performance improvements on our benchmarks * subject to an internal conf that sets an upper limit on the maximum length of the aggregate diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index 61bd6eb3cde66500b23c711d144c7594a35cf000..8a3f466ccfef3207f508f2e7f2f7cb69a2eb9924 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.types._ /** * This is a helper class to generate an append-only vectorized hash map that can act as a 'cache' * for extremely fast key-value lookups while evaluating aggregates (and fall back to the - * `BytesToBytesMap` if a given key isn't found). This is 'codegened' in TungstenAggregate to speed + * `BytesToBytesMap` if a given key isn't found). This is 'codegened' in HashAggregate to speed * up aggregates w/ key. * * It is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the @@ -127,7 +127,7 @@ class VectorizedHashMapGenerator( | public $generatedClassName() { | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); - | // TODO: Possibly generate this projection in TungstenAggregate directly + | // TODO: Possibly generate this projection in HashAggregate directly | aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate( | aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); | for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4fcd6bc0d9ec95b773f335609b4d15d3f0e6f4c6..8284e8d6d89b65798a61dcc05a6c84ab837a8b1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -251,7 +251,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { if (!hasGeneratedAgg) { fail( s""" - |Codegen is enabled, but query $sqlText does not have TungstenAggregate in the plan. + |Codegen is enabled, but query $sqlText does not have HashAggregate in the plan. |${df.queryExecution.simpleString} """.stripMargin) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index d3639d97355b5b06de42d4cdb04ccbc2c90bd254..fd956bc4ef900703914d8e6e6c1452dd5fe1df59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql.execution.metric @@ -135,8 +135,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { test("Aggregate metrics") { // Assume the execution plan is - // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1) - // -> TungstenAggregate(nodeId = 0) + // ... -> HashAggregate(nodeId = 2) -> Exchange(nodeId = 1) + // -> HashAggregate(nodeId = 0) val df = testData2.groupBy().count() // 2 partitions testSparkPlanMetrics(df, 1, Map( 2L -> ("HashAggregate", Map("number of output rows" -> 2L)), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 9fc5628b28dbb64d7de781e7194fe72b43a726d5..a16fe3228b1fcbc4d1e90f4a17df6171336954f7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -869,10 +869,10 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te DateType, TimestampType, ArrayType(IntegerType), MapType(StringType, LongType), struct, new UDT.MyDenseVectorUDT()) - // Right now, we will use SortBasedAggregate to handle UDAFs. - // UnsafeRow.mutableFieldTypes.asScala.toSeq will trigger SortBasedAggregate to use + // Right now, we will use SortAggregate to handle UDAFs. + // UnsafeRow.mutableFieldTypes.asScala.toSeq will trigger SortAggregate to use // UnsafeRow as the aggregation buffer. While, dataTypes will trigger - // SortBasedAggregate to use a safe row as the aggregation buffer. + // SortAggregate to use a safe row as the aggregation buffer. Seq(dataTypes, UnsafeRow.mutableFieldTypes.asScala.toSeq).foreach { dataTypes => val fields = dataTypes.zipWithIndex.map { case (dataType, index) => StructField(s"col$index", dataType, nullable = true) @@ -992,10 +992,10 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te } -class TungstenAggregationQuerySuite extends AggregationQuerySuite +class HashAggregationQuerySuite extends AggregationQuerySuite -class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite { +class HashAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite { override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = { Seq(0, 10).foreach { maxColumnarHashMapColumns => @@ -1013,7 +1013,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue case Some(errorMessage) => val newErrorMessage = s""" - |The following aggregation query failed when using TungstenAggregate with + |The following aggregation query failed when using HashAggregate with |controlled fallback (it falls back to bytes to bytes map once it has processed |${fallbackStartsAt - 1} input rows and to sort-based aggregation once it has |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution}