Skip to content
Snippets Groups Projects
Commit 60977889 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-20136][SQL] Add num files and metadata operation timing to scan operator metrics

## What changes were proposed in this pull request?
This patch adds explicit metadata operation timing and number of files in data source metrics. Those would be useful to include for performance profiling.

Screenshot of a UI with this change (num files and metadata time are new metrics):

<img width="321" alt="screen shot 2017-03-29 at 12 29 28 am" src="https://cloud.githubusercontent.com/assets/323388/24443272/d4ea58c0-1416-11e7-8940-ecb69375554a.png">

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #17465 from rxin/SPARK-20136.
parent 22f07fef
No related branches found
No related tags found
No related merge requests found
...@@ -171,8 +171,20 @@ case class FileSourceScanExec( ...@@ -171,8 +171,20 @@ case class FileSourceScanExec(
false false
} }
@transient private lazy val selectedPartitions = @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
relation.location.listFiles(partitionFilters, dataFilters) val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000
metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
metrics("metadataTime").add(timeTaken)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
metrics("numFiles") :: metrics("metadataTime") :: Nil)
ret
}
override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
...@@ -293,6 +305,8 @@ case class FileSourceScanExec( ...@@ -293,6 +305,8 @@ case class FileSourceScanExec(
override lazy val metrics = override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
"metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment