Skip to content
Snippets Groups Projects
Commit 4cea9da2 authored by Ergin Seyfe's avatar Ergin Seyfe Committed by Sean Owen
Browse files

[SPARK-17480][SQL] Improve performance by removing or caching List.length which is O(n)

## What changes were proposed in this pull request?
Scala's List.length method is O(N) and it makes the gatherCompressibilityStats function O(N^2). Eliminate the List.length calls by writing it in Scala way.

https://github.com/scala/scala/blob/2.10.x/src/library/scala/collection/LinearSeqOptimized.scala#L36

As suggested. Extended the fix to HiveInspectors and AggregationIterator classes as well.

## How was this patch tested?
Profiled a Spark job and found that CompressibleColumnBuilder is using 39% of the CPU. Out of this 39% CompressibleColumnBuilder->gatherCompressibilityStats is using 23% of it. 6.24% of the CPU is spend on List.length which is called inside gatherCompressibilityStats.

After this change we started to save 6.24% of the CPU.

Author: Ergin Seyfe <eseyfe@fb.com>

Closes #15032 from seyfe/gatherCompressibilityStats.
parent 18b4f035
No related branches found
No related tags found
No related merge requests found
...@@ -73,9 +73,10 @@ abstract class AggregationIterator( ...@@ -73,9 +73,10 @@ abstract class AggregationIterator(
startingInputBufferOffset: Int): Array[AggregateFunction] = { startingInputBufferOffset: Int): Array[AggregateFunction] = {
var mutableBufferOffset = 0 var mutableBufferOffset = 0
var inputBufferOffset: Int = startingInputBufferOffset var inputBufferOffset: Int = startingInputBufferOffset
val functions = new Array[AggregateFunction](expressions.length) val expressionsLength = expressions.length
val functions = new Array[AggregateFunction](expressionsLength)
var i = 0 var i = 0
while (i < expressions.length) { while (i < expressionsLength) {
val func = expressions(i).aggregateFunction val func = expressions(i).aggregateFunction
val funcWithBoundReferences: AggregateFunction = expressions(i).mode match { val funcWithBoundReferences: AggregateFunction = expressions(i).mode match {
case Partial | Complete if func.isInstanceOf[ImperativeAggregate] => case Partial | Complete if func.isInstanceOf[ImperativeAggregate] =>
...@@ -171,7 +172,7 @@ abstract class AggregationIterator( ...@@ -171,7 +172,7 @@ abstract class AggregationIterator(
case PartialMerge | Final => case PartialMerge | Final =>
(buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row) (buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row)
} }
} }.toArray
// This projection is used to merge buffer values for all expression-based aggregates. // This projection is used to merge buffer values for all expression-based aggregates.
val aggregationBufferSchema = functions.flatMap(_.aggBufferAttributes) val aggregationBufferSchema = functions.flatMap(_.aggBufferAttributes)
val updateProjection = val updateProjection =
......
...@@ -66,11 +66,7 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType] ...@@ -66,11 +66,7 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType]
} }
private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
var i = 0 compressionEncoders.foreach(_.gatherCompressibilityStats(row, ordinal))
while (i < compressionEncoders.length) {
compressionEncoders(i).gatherCompressibilityStats(row, ordinal)
i += 1
}
} }
abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
......
...@@ -756,7 +756,8 @@ private[hive] trait HiveInspectors { ...@@ -756,7 +756,8 @@ private[hive] trait HiveInspectors {
cache: Array[AnyRef], cache: Array[AnyRef],
dataTypes: Array[DataType]): Array[AnyRef] = { dataTypes: Array[DataType]): Array[AnyRef] = {
var i = 0 var i = 0
while (i < inspectors.length) { val length = inspectors.length
while (i < length) {
cache(i) = wrap(row.get(i, dataTypes(i)), inspectors(i), dataTypes(i)) cache(i) = wrap(row.get(i, dataTypes(i)), inspectors(i), dataTypes(i))
i += 1 i += 1
} }
...@@ -769,7 +770,8 @@ private[hive] trait HiveInspectors { ...@@ -769,7 +770,8 @@ private[hive] trait HiveInspectors {
cache: Array[AnyRef], cache: Array[AnyRef],
dataTypes: Array[DataType]): Array[AnyRef] = { dataTypes: Array[DataType]): Array[AnyRef] = {
var i = 0 var i = 0
while (i < inspectors.length) { val length = inspectors.length
while (i < length) {
cache(i) = wrap(row(i), inspectors(i), dataTypes(i)) cache(i) = wrap(row(i), inspectors(i), dataTypes(i))
i += 1 i += 1
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment