From bcf3643f94643b76a51cfab0a68871d106eef8af Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh <viirya@gmail.com> Date: Tue, 13 Jun 2017 10:10:35 -0700 Subject: [PATCH] [SPARK-21051][SQL] Add hash map metrics to aggregate ## What changes were proposed in this pull request? This adds the average hash map probe metrics to hash aggregate. `BytesToBytesMap` already has API to get the metrics, this PR adds an API to `UnsafeFixedWidthAggregationMap` to access it. Preparing a test for this metrics seems tricky, because we don't know what collision keys are. For now, the test case generates random data large enough to have desired probe. TODO in later PR: add hash map metrics to join. ## How was this patch tested? Added test to SQLMetricsSuite. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #18258 from viirya/SPARK-20953. --- .../UnsafeFixedWidthAggregationMap.java | 23 ++---- .../aggregate/HashAggregateExec.scala | 27 +++++-- .../TungstenAggregationIterator.scala | 10 ++- .../sql/execution/metric/SQLMetrics.scala | 32 +++++++- .../UnsafeFixedWidthAggregationMapSuite.scala | 24 ++---- .../execution/metric/SQLMetricsSuite.scala | 79 +++++++++++++++---- 6 files changed, 135 insertions(+), 60 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java index cd521c52d1..8fea46a58e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java @@ -63,8 +63,6 @@ public final class UnsafeFixedWidthAggregationMap { */ private final UnsafeRow currentAggregationBuffer; - private final boolean enablePerfMetrics; - /** * @return true if UnsafeFixedWidthAggregationMap supports aggregation buffers with the given * schema, false otherwise. @@ -87,7 +85,6 @@ public final class UnsafeFixedWidthAggregationMap { * @param taskMemoryManager the memory manager used to allocate our Unsafe memory structures. * @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing). * @param pageSizeBytes the data page size, in bytes; limits the maximum record size. - * @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact) */ public UnsafeFixedWidthAggregationMap( InternalRow emptyAggregationBuffer, @@ -95,15 +92,13 @@ public final class UnsafeFixedWidthAggregationMap { StructType groupingKeySchema, TaskMemoryManager taskMemoryManager, int initialCapacity, - long pageSizeBytes, - boolean enablePerfMetrics) { + long pageSizeBytes) { this.aggregationBufferSchema = aggregationBufferSchema; this.currentAggregationBuffer = new UnsafeRow(aggregationBufferSchema.length()); this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema); this.groupingKeySchema = groupingKeySchema; this.map = - new BytesToBytesMap(taskMemoryManager, initialCapacity, pageSizeBytes, enablePerfMetrics); - this.enablePerfMetrics = enablePerfMetrics; + new BytesToBytesMap(taskMemoryManager, initialCapacity, pageSizeBytes, true); // Initialize the buffer for aggregation value final UnsafeProjection valueProjection = UnsafeProjection.create(aggregationBufferSchema); @@ -223,15 +218,11 @@ public final class UnsafeFixedWidthAggregationMap { map.free(); } - @SuppressWarnings("UseOfSystemOutOrSystemErr") - public void printPerfMetrics() { - if (!enablePerfMetrics) { - throw new IllegalStateException("Perf metrics not enabled"); - } - System.out.println("Average probes per lookup: " + map.getAverageProbesPerLookup()); - System.out.println("Number of hash collisions: " + map.getNumHashCollisions()); - System.out.println("Time spent resizing (ns): " + map.getTimeSpentResizingNs()); - System.out.println("Total memory consumption (bytes): " + map.getTotalMemoryConsumption()); + /** + * Gets the average hash map probe per looking up for the underlying `BytesToBytesMap`. + */ + public double getAverageProbesPerLookup() { + return map.getAverageProbesPerLookup(); } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 68c8e6ce62..9df5e58f70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -59,7 +59,8 @@ case class HashAggregateExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), - "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time")) + "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"), + "avgHashmapProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hashmap probe")) override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) @@ -93,6 +94,7 @@ case class HashAggregateExec( val numOutputRows = longMetric("numOutputRows") val peakMemory = longMetric("peakMemory") val spillSize = longMetric("spillSize") + val avgHashmapProbe = longMetric("avgHashmapProbe") child.execute().mapPartitions { iter => @@ -116,7 +118,8 @@ case class HashAggregateExec( testFallbackStartsAt, numOutputRows, peakMemory, - spillSize) + spillSize, + avgHashmapProbe) if (!hasInput && groupingExpressions.isEmpty) { numOutputRows += 1 Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput()) @@ -157,7 +160,7 @@ case class HashAggregateExec( } } - // The variables used as aggregation buffer + // The variables used as aggregation buffer. Only used for aggregation without keys. private var bufVars: Seq[ExprCode] = _ private def doProduceWithoutKeys(ctx: CodegenContext): String = { @@ -312,8 +315,7 @@ case class HashAggregateExec( groupingKeySchema, TaskContext.get().taskMemoryManager(), 1024 * 16, // initial capacity - TaskContext.get().taskMemoryManager().pageSizeBytes, - false // disable tracking of performance metrics + TaskContext.get().taskMemoryManager().pageSizeBytes ) } @@ -341,7 +343,8 @@ case class HashAggregateExec( hashMap: UnsafeFixedWidthAggregationMap, sorter: UnsafeKVExternalSorter, peakMemory: SQLMetric, - spillSize: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = { + spillSize: SQLMetric, + avgHashmapProbe: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = { // update peak execution memory val mapMemory = hashMap.getPeakMemoryUsedBytes @@ -351,6 +354,10 @@ case class HashAggregateExec( peakMemory.add(maxMemory) metrics.incPeakExecutionMemory(maxMemory) + // Update average hashmap probe + val avgProbes = hashMap.getAverageProbesPerLookup() + avgHashmapProbe.add(avgProbes.ceil.toLong) + if (sorter == null) { // not spilled return hashMap.iterator() @@ -577,6 +584,7 @@ case class HashAggregateExec( val doAgg = ctx.freshName("doAggregateWithKeys") val peakMemory = metricTerm(ctx, "peakMemory") val spillSize = metricTerm(ctx, "spillSize") + val avgHashmapProbe = metricTerm(ctx, "avgHashmapProbe") def generateGenerateCode(): String = { if (isFastHashMapEnabled) { @@ -602,7 +610,8 @@ case class HashAggregateExec( ${if (isFastHashMapEnabled) { s"$iterTermForFastHashMap = $fastHashMapTerm.rowIterator();"} else ""} - $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize); + $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize, + $avgHashmapProbe); } """) @@ -792,6 +801,8 @@ case class HashAggregateExec( | $unsafeRowBuffer = | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value}); | } + | // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based + | // aggregation after processing all input rows. | if ($unsafeRowBuffer == null) { | if ($sorterTerm == null) { | $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter(); @@ -800,7 +811,7 @@ case class HashAggregateExec( | } | $resetCounter | // the hash map had be spilled, it should have enough memory now, - | // try to allocate buffer again. + | // try to allocate buffer again. | $unsafeRowBuffer = | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value}); | if ($unsafeRowBuffer == null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 2988161ee5..8efa95d48a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -88,7 +88,8 @@ class TungstenAggregationIterator( testFallbackStartsAt: Option[(Int, Int)], numOutputRows: SQLMetric, peakMemory: SQLMetric, - spillSize: SQLMetric) + spillSize: SQLMetric, + avgHashmapProbe: SQLMetric) extends AggregationIterator( groupingExpressions, originalInputAttributes, @@ -162,8 +163,7 @@ class TungstenAggregationIterator( StructType.fromAttributes(groupingExpressions.map(_.toAttribute)), TaskContext.get().taskMemoryManager(), 1024 * 16, // initial capacity - TaskContext.get().taskMemoryManager().pageSizeBytes, - false // disable tracking of performance metrics + TaskContext.get().taskMemoryManager().pageSizeBytes ) // The function used to read and process input rows. When processing input rows, @@ -420,6 +420,10 @@ class TungstenAggregationIterator( peakMemory += maxMemory spillSize += metrics.memoryBytesSpilled - spillSizeBefore metrics.incPeakExecutionMemory(maxMemory) + + // Update average hashmap probe if this is the last record. + val averageProbes = hashMap.getAverageProbesPerLookup() + avgHashmapProbe.add(averageProbes.ceil.toLong) } numOutputRows += 1 res diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index ef982a4ebd..49cab04de2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -68,11 +68,11 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato } } - object SQLMetrics { private val SUM_METRIC = "sum" private val SIZE_METRIC = "size" private val TIMING_METRIC = "timing" + private val AVERAGE_METRIC = "average" def createMetric(sc: SparkContext, name: String): SQLMetric = { val acc = new SQLMetric(SUM_METRIC) @@ -102,6 +102,22 @@ object SQLMetrics { acc } + /** + * Create a metric to report the average information (including min, med, max) like + * avg hashmap probe. Because `SQLMetric` stores long values, we take the ceil of the average + * values before storing them. This metric is used to record an average value computed in the + * end of a task. It should be set once. The initial values (zeros) of this metrics will be + * excluded after. + */ + def createAverageMetric(sc: SparkContext, name: String): SQLMetric = { + // The final result of this metric in physical operator UI may looks like: + // probe avg (min, med, max): + // (1, 2, 6) + val acc = new SQLMetric(AVERAGE_METRIC) + acc.register(sc, name = Some(s"$name (min, med, max)"), countFailedValues = false) + acc + } + /** * A function that defines how we aggregate the final accumulator results among all tasks, * and represent it in string for a SQL physical operator. @@ -110,6 +126,20 @@ object SQLMetrics { if (metricsType == SUM_METRIC) { val numberFormat = NumberFormat.getIntegerInstance(Locale.US) numberFormat.format(values.sum) + } else if (metricsType == AVERAGE_METRIC) { + val numberFormat = NumberFormat.getIntegerInstance(Locale.US) + + val validValues = values.filter(_ > 0) + val Seq(min, med, max) = { + val metric = if (validValues.isEmpty) { + Seq.fill(3)(0L) + } else { + val sorted = validValues.sorted + Seq(sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1)) + } + metric.map(numberFormat.format) + } + s"\n($min, $med, $max)" } else { val strFormat: Long => String = if (metricsType == SIZE_METRIC) { Utils.bytesToString diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala index 6cf18de0cc..50d8e30245 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -111,8 +111,7 @@ class UnsafeFixedWidthAggregationMapSuite groupKeySchema, taskMemoryManager, 1024, // initial capacity, - PAGE_SIZE_BYTES, - false // disable perf metrics + PAGE_SIZE_BYTES ) assert(!map.iterator().next()) map.free() @@ -125,8 +124,7 @@ class UnsafeFixedWidthAggregationMapSuite groupKeySchema, taskMemoryManager, 1024, // initial capacity - PAGE_SIZE_BYTES, - false // disable perf metrics + PAGE_SIZE_BYTES ) val groupKey = InternalRow(UTF8String.fromString("cats")) @@ -152,8 +150,7 @@ class UnsafeFixedWidthAggregationMapSuite groupKeySchema, taskMemoryManager, 128, // initial capacity - PAGE_SIZE_BYTES, - false // disable perf metrics + PAGE_SIZE_BYTES ) val rand = new Random(42) val groupKeys: Set[String] = Seq.fill(512)(rand.nextString(1024)).toSet @@ -178,8 +175,7 @@ class UnsafeFixedWidthAggregationMapSuite groupKeySchema, taskMemoryManager, 128, // initial capacity - PAGE_SIZE_BYTES, - false // disable perf metrics + PAGE_SIZE_BYTES ) val keys = randomStrings(1024).take(512) @@ -226,8 +222,7 @@ class UnsafeFixedWidthAggregationMapSuite groupKeySchema, taskMemoryManager, 128, // initial capacity - PAGE_SIZE_BYTES, - false // disable perf metrics + PAGE_SIZE_BYTES ) val sorter = map.destructAndCreateExternalSorter() @@ -267,8 +262,7 @@ class UnsafeFixedWidthAggregationMapSuite StructType(Nil), taskMemoryManager, 128, // initial capacity - PAGE_SIZE_BYTES, - false // disable perf metrics + PAGE_SIZE_BYTES ) (1 to 10).foreach { i => val buf = map.getAggregationBuffer(UnsafeRow.createFromByteArray(0, 0)) @@ -312,8 +306,7 @@ class UnsafeFixedWidthAggregationMapSuite groupKeySchema, taskMemoryManager, 128, // initial capacity - pageSize, - false // disable perf metrics + pageSize ) val rand = new Random(42) @@ -350,8 +343,7 @@ class UnsafeFixedWidthAggregationMapSuite groupKeySchema, taskMemoryManager, 128, // initial capacity - pageSize, - false // disable perf metrics + pageSize ) val rand = new Random(42) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index a4e62f1d16..a12ce2b9eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.metric import java.io.File import scala.collection.mutable.HashMap +import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} @@ -35,18 +36,18 @@ import org.apache.spark.util.{AccumulatorContext, JsonProtocol} class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { import testImplicits._ + /** - * Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics". + * Call `df.collect()` and collect necessary metrics from execution data. * * @param df `DataFrame` to run * @param expectedNumOfJobs number of jobs that will run - * @param expectedMetrics the expected metrics. The format is - * `nodeId -> (operatorName, metric name -> metric value)`. + * @param expectedNodeIds the node ids of the metrics to collect from execution data. */ - private def testSparkPlanMetrics( + private def getSparkPlanMetrics( df: DataFrame, expectedNumOfJobs: Int, - expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = { + expectedNodeIds: Set[Long]): Option[Map[Long, (String, Map[String, Any])]] = { val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet withSQLConf("spark.sql.codegen.wholeStage" -> "false") { df.collect() @@ -63,9 +64,9 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { if (jobs.size == expectedNumOfJobs) { // If we can track all jobs, check the metric values val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId) - val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan( + val metrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan( df.queryExecution.executedPlan)).allNodes.filter { node => - expectedMetrics.contains(node.id) + expectedNodeIds.contains(node.id) }.map { node => val nodeMetrics = node.metrics.map { metric => val metricValue = metricValues(metric.accumulatorId) @@ -73,7 +74,30 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { }.toMap (node.id, node.name -> nodeMetrics) }.toMap + Some(metrics) + } else { + // TODO Remove this "else" once we fix the race condition that missing the JobStarted event. + // Since we cannot track all jobs, the metric values could be wrong and we should not check + // them. + logWarning("Due to a race condition, we miss some jobs and cannot verify the metric values") + None + } + } + /** + * Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics". + * + * @param df `DataFrame` to run + * @param expectedNumOfJobs number of jobs that will run + * @param expectedMetrics the expected metrics. The format is + * `nodeId -> (operatorName, metric name -> metric value)`. + */ + private def testSparkPlanMetrics( + df: DataFrame, + expectedNumOfJobs: Int, + expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = { + val optActualMetrics = getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetrics.keySet) + optActualMetrics.map { actualMetrics => assert(expectedMetrics.keySet === actualMetrics.keySet) for (nodeId <- expectedMetrics.keySet) { val (expectedNodeName, expectedMetricsMap) = expectedMetrics(nodeId) @@ -83,11 +107,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { assert(expectedMetricsMap(metricName).toString === actualMetricsMap(metricName)) } } - } else { - // TODO Remove this "else" once we fix the race condition that missing the JobStarted event. - // Since we cannot track all jobs, the metric values could be wrong and we should not check - // them. - logWarning("Due to a race condition, we miss some jobs and cannot verify the metric values") } } @@ -130,19 +149,47 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { // ... -> HashAggregate(nodeId = 2) -> Exchange(nodeId = 1) // -> HashAggregate(nodeId = 0) val df = testData2.groupBy().count() // 2 partitions + val expected1 = Seq( + Map("number of output rows" -> 2L, + "avg hashmap probe (min, med, max)" -> "\n(1, 1, 1)"), + Map("number of output rows" -> 1L, + "avg hashmap probe (min, med, max)" -> "\n(1, 1, 1)")) testSparkPlanMetrics(df, 1, Map( - 2L -> ("HashAggregate", Map("number of output rows" -> 2L)), - 0L -> ("HashAggregate", Map("number of output rows" -> 1L))) + 2L -> ("HashAggregate", expected1(0)), + 0L -> ("HashAggregate", expected1(1))) ) // 2 partitions and each partition contains 2 keys val df2 = testData2.groupBy('a).count() + val expected2 = Seq( + Map("number of output rows" -> 4L, + "avg hashmap probe (min, med, max)" -> "\n(1, 1, 1)"), + Map("number of output rows" -> 3L, + "avg hashmap probe (min, med, max)" -> "\n(1, 1, 1)")) testSparkPlanMetrics(df2, 1, Map( - 2L -> ("HashAggregate", Map("number of output rows" -> 4L)), - 0L -> ("HashAggregate", Map("number of output rows" -> 3L))) + 2L -> ("HashAggregate", expected2(0)), + 0L -> ("HashAggregate", expected2(1))) ) } + test("Aggregate metrics: track avg probe") { + val random = new Random() + val manyBytes = (0 until 65535).map { _ => + val byteArrSize = random.nextInt(100) + val bytes = new Array[Byte](byteArrSize) + random.nextBytes(bytes) + (bytes, random.nextInt(100)) + } + val df = manyBytes.toSeq.toDF("a", "b").repartition(1).groupBy('a).count() + val metrics = getSparkPlanMetrics(df, 1, Set(2L, 0L)).get + Seq(metrics(2L)._2("avg hashmap probe (min, med, max)"), + metrics(0L)._2("avg hashmap probe (min, med, max)")).foreach { probes => + probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach { probe => + assert(probe.toInt > 1) + } + } + } + test("ObjectHashAggregate metrics") { // Assume the execution plan is // ... -> ObjectHashAggregate(nodeId = 2) -> Exchange(nodeId = 1) -- GitLab