Skip to content
Snippets Groups Projects
Commit 86664338 authored by Wenchen Fan's avatar Wenchen Fan Committed by Cheng Lian
Browse files

[SPARK-17528][SQL][FOLLOWUP] remove unnecessary data copy in object hash aggregate

## What changes were proposed in this pull request?

In #18483 , we fixed the data copy bug when saving into `InternalRow`, and removed all workarounds for this bug in the aggregate code path. However, the object hash aggregate was missed, this PR fixes it.

This patch is also a requirement for #17419 , which shows that DataFrame version is slower than RDD version because of this issue.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18712 from cloud-fan/minor.
parent 481f0792
No related branches found
No related tags found
No related merge requests found
...@@ -70,10 +70,6 @@ class ObjectAggregationIterator( ...@@ -70,10 +70,6 @@ class ObjectAggregationIterator(
generateProcessRow(newExpressions, newFunctions, newInputAttributes) generateProcessRow(newExpressions, newFunctions, newInputAttributes)
} }
// A safe projection used to do deep clone of input rows to prevent false sharing.
private[this] val safeProjection: Projection =
FromUnsafeProjection(outputAttributes.map(_.dataType))
/** /**
* Start processing input rows. * Start processing input rows.
*/ */
...@@ -151,12 +147,11 @@ class ObjectAggregationIterator( ...@@ -151,12 +147,11 @@ class ObjectAggregationIterator(
val groupingKey = groupingProjection.apply(null) val groupingKey = groupingProjection.apply(null)
val buffer: InternalRow = getAggregationBufferByKey(hashMap, groupingKey) val buffer: InternalRow = getAggregationBufferByKey(hashMap, groupingKey)
while (inputRows.hasNext) { while (inputRows.hasNext) {
val newInput = safeProjection(inputRows.next()) processRow(buffer, inputRows.next())
processRow(buffer, newInput)
} }
} else { } else {
while (inputRows.hasNext && !sortBased) { while (inputRows.hasNext && !sortBased) {
val newInput = safeProjection(inputRows.next()) val newInput = inputRows.next()
val groupingKey = groupingProjection.apply(newInput) val groupingKey = groupingProjection.apply(newInput)
val buffer: InternalRow = getAggregationBufferByKey(hashMap, groupingKey) val buffer: InternalRow = getAggregationBufferByKey(hashMap, groupingKey)
processRow(buffer, newInput) processRow(buffer, newInput)
...@@ -266,9 +261,7 @@ class SortBasedAggregator( ...@@ -266,9 +261,7 @@ class SortBasedAggregator(
// Firstly, update the aggregation buffer with input rows. // Firstly, update the aggregation buffer with input rows.
while (hasNextInput && while (hasNextInput &&
groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) == 0) { groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) == 0) {
// Since `inputIterator.getValue` is an `UnsafeRow` whose underlying buffer will be processRow(result.aggregationBuffer, inputIterator.getValue)
// overwritten when `inputIterator` steps forward, we need to do a deep copy here.
processRow(result.aggregationBuffer, inputIterator.getValue.copy())
hasNextInput = inputIterator.next() hasNextInput = inputIterator.next()
} }
...@@ -277,12 +270,7 @@ class SortBasedAggregator( ...@@ -277,12 +270,7 @@ class SortBasedAggregator(
// be called after calling processRow. // be called after calling processRow.
while (hasNextAggBuffer && while (hasNextAggBuffer &&
groupingKeyOrdering.compare(initialAggBufferIterator.getKey, groupingKey) == 0) { groupingKeyOrdering.compare(initialAggBufferIterator.getKey, groupingKey) == 0) {
mergeAggregationBuffers( mergeAggregationBuffers(result.aggregationBuffer, initialAggBufferIterator.getValue)
result.aggregationBuffer,
// Since `inputIterator.getValue` is an `UnsafeRow` whose underlying buffer will be
// overwritten when `inputIterator` steps forward, we need to do a deep copy here.
initialAggBufferIterator.getValue.copy()
)
hasNextAggBuffer = initialAggBufferIterator.next() hasNextAggBuffer = initialAggBufferIterator.next()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment