diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 28156b277f59760caae2c5a6c17fc43bc052200a..239151495f4bd7b706c737b45d0013375db8f386 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -171,8 +171,20 @@ case class FileSourceScanExec( false } - @transient private lazy val selectedPartitions = - relation.location.listFiles(partitionFilters, dataFilters) + @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { + val startTime = System.nanoTime() + val ret = relation.location.listFiles(partitionFilters, dataFilters) + val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000 + + metrics("numFiles").add(ret.map(_.files.size.toLong).sum) + metrics("metadataTime").add(timeTaken) + + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, + metrics("numFiles") :: metrics("metadataTime") :: Nil) + + ret + } override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { @@ -293,6 +305,8 @@ case class FileSourceScanExec( override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"), + "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time (ms)"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) protected override def doExecute(): RDD[InternalRow] = {