diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 53b0389c3a67373394e8a30a9a12a2401718c45a..c27ed3640611948743dee57ad469dc851b3c8b75 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin val shuffleMetrics = new ShuffleReadMetrics shuffleMetrics.shuffleReadMillis = itr.getNetMillis shuffleMetrics.remoteFetchTime = itr.remoteFetchTime - shuffleMetrics.remoteFetchWaitTime = itr.remoteFetchWaitTime + shuffleMetrics.fetchWaitTime = itr.fetchWaitTime shuffleMetrics.remoteBytesRead = itr.remoteBytesRead shuffleMetrics.totalBlocksFetched = itr.totalBlocks shuffleMetrics.localBlocksFetched = itr.numLocalBlocks diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index b9c07830f5c94a62440a7b53fb81721a5c0400e6..93bbb6b4587a5b6773ad9597e0b767c0e0531688 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -54,9 +54,9 @@ class ShuffleReadMetrics extends Serializable { var shuffleReadMillis: Long = _ /** - * Total time that is spent blocked waiting for shuffle to fetch remote data + * Total time that is spent blocked waiting for shuffle to fetch data */ - var remoteFetchWaitTime: Long = _ + var fetchWaitTime: Long = _ /** * The total amount of time for all the shuffle fetches. This adds up time from overlapping diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 21185227ab0c6ff822c73e4e91c135434a92d726..a65140b145833c70e584936785ac8fe94d6b0bc2 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -31,7 +31,7 @@ class StatsReportListener extends SparkListener with Logging { showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten}) //fetch & io - showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.remoteFetchWaitTime}) + showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime}) showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead}) showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize)) @@ -137,7 +137,7 @@ case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], othe object RuntimePercentage { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { val denom = totalTime.toDouble - val fetchTime = metrics.shuffleReadMetrics.map{_.remoteFetchWaitTime} + val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime} val fetch = fetchTime.map{_ / denom} val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom val other = 1.0 - (exec + fetch.getOrElse(0d)) diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index a76253ea14d3e108f251501c948383d143b666f6..9e1bde3fbe44e7196f5e3bb46b4edd71729e8f4b 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -67,8 +67,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon logInfo("Size of task " + idInJob + " is " + bytes.limit + " bytes") val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes) updateDependencies(taskFiles, taskJars) // Download any files added with addFile + val deserStart = System.currentTimeMillis() val deserializedTask = ser.deserialize[Task[_]]( taskBytes, Thread.currentThread.getContextClassLoader) + val deserTime = System.currentTimeMillis() - deserStart // Run it val result: Any = deserializedTask.run(attemptId) @@ -77,15 +79,19 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon // executor does. This is useful to catch serialization errors early // on in development (so when users move their local Spark programs // to the cluster, they don't get surprised by serialization errors). - val resultToReturn = ser.deserialize[Any](ser.serialize(result)) + val serResult = ser.serialize(result) + deserializedTask.metrics.get.resultSize = serResult.limit() + val resultToReturn = ser.deserialize[Any](serResult) val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]]( ser.serialize(Accumulators.values)) logInfo("Finished " + task) info.markSuccessful() + deserializedTask.metrics.get.executorRunTime = info.duration.toInt //close enough + deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt // If the threadpool has not already been shutdown, notify DAGScheduler if (!Thread.currentThread().isInterrupted) - listener.taskEnded(task, Success, resultToReturn, accumUpdates, info, null) + listener.taskEnded(task, Success, resultToReturn, accumUpdates, info, deserializedTask.metrics.getOrElse(null)) } catch { case t: Throwable => { logError("Exception in task " + idInJob, t) diff --git a/core/src/main/scala/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/spark/storage/BlockFetchTracker.scala index ababb04305cde82f18665b6b65298af2857faf0c..993aece1f7f76b8550dd1b6815e98c33d60e1a0a 100644 --- a/core/src/main/scala/spark/storage/BlockFetchTracker.scala +++ b/core/src/main/scala/spark/storage/BlockFetchTracker.scala @@ -5,6 +5,6 @@ private[spark] trait BlockFetchTracker { def numLocalBlocks: Int def numRemoteBlocks: Int def remoteFetchTime : Long - def remoteFetchWaitTime: Long + def fetchWaitTime: Long def remoteBytesRead : Long } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 3118d3d412b2f207f502c0cbbbd8479837217fb3..210061e9726b4108e4ebe1c00c138f2ed3d4e85b 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -903,7 +903,7 @@ class BlockFetcherIterator( private var _remoteBytesRead = 0l private var _remoteFetchTime = 0l - private var _remoteFetchWaitTime = 0l + private var _fetchWaitTime = 0l if (blocksByAddress == null) { throw new IllegalArgumentException("BlocksByAddress is null") @@ -1046,7 +1046,7 @@ class BlockFetcherIterator( val startFetchWait = System.currentTimeMillis() val result = results.take() val stopFetchWait = System.currentTimeMillis() - _remoteFetchWaitTime += (stopFetchWait - startFetchWait) + _fetchWaitTime += (stopFetchWait - startFetchWait) bytesInFlight -= result.size while (!fetchRequests.isEmpty && (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { @@ -1061,7 +1061,7 @@ class BlockFetcherIterator( def numRemoteBlocks = remoteBlockIds.size def remoteFetchTime = _remoteFetchTime - def remoteFetchWaitTime = _remoteFetchWaitTime + def fetchWaitTime = _fetchWaitTime def remoteBytesRead = _remoteBytesRead diff --git a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala index 5c491877bad1929cbda15eb414b252f99e00e382..f6c28dce52ad554bdc06d20c1c0a2ad53d990018 100644 --- a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala +++ b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala @@ -7,6 +7,6 @@ private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker { def numLocalBlocks = delegate.numLocalBlocks def numRemoteBlocks = delegate.numRemoteBlocks def remoteFetchTime = delegate.remoteFetchTime - def remoteFetchWaitTime = delegate.remoteFetchWaitTime + def fetchWaitTime = delegate.fetchWaitTime def remoteBytesRead = delegate.remoteBytesRead } diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..2f5af10e69c7f2293ee7e071893df883330560b7 --- /dev/null +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -0,0 +1,86 @@ +package spark.scheduler + +import org.scalatest.FunSuite +import spark.{SparkContext, LocalSparkContext} +import scala.collection.mutable +import org.scalatest.matchers.ShouldMatchers +import spark.SparkContext._ + +/** + * + */ + +class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + + test("local metrics") { + sc = new SparkContext("local[4]", "test") + val listener = new SaveStageInfo + sc.addSparkListener(listener) + sc.addSparkListener(new StatsReportListener) + //just to make sure some of the tasks take a noticeable amount of time + val w = {i:Int => + if (i == 0) + Thread.sleep(100) + i + } + + val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)} + d.count + listener.stageInfos.size should be (1) + + val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1") + + val d3 = d.map{i => w(i) -> (0 to (i % 5))}.setName("shuffle input 2") + + val d4 = d2.cogroup(d3, 64).map{case(k,(v1,v2)) => w(k) -> (v1.size, v2.size)} + d4.setName("A Cogroup") + + d4.collectAsMap + + listener.stageInfos.size should be (4) + listener.stageInfos.foreach {stageInfo => + //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms + checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration") + checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, stageInfo + " executorRunTime") + checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, stageInfo + " executorDeserializeTime") + if (stageInfo.stage.rdd.name == d4.name) { + checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime") + } + + stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) => + taskMetrics.resultSize should be > (0l) + if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) { + taskMetrics.shuffleWriteMetrics should be ('defined) + taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l) + } + if (stageInfo.stage.rdd.name == d4.name) { + taskMetrics.shuffleReadMetrics should be ('defined) + val sm = taskMetrics.shuffleReadMetrics.get + sm.totalBlocksFetched should be > (0) + sm.shuffleReadMillis should be > (0l) + sm.localBlocksFetched should be > (0) + sm.remoteBlocksFetched should be (0) + sm.remoteBytesRead should be (0l) + sm.remoteFetchTime should be (0l) + } + } + } + } + + def checkNonZeroAvg(m: Traversable[Long], msg: String) { + assert(m.sum / m.size.toDouble > 0.0, msg) + } + + def isStage(stageInfo: StageInfo, rddNames: Set[String], excludedNames: Set[String]) = { + val names = Set(stageInfo.stage.rdd.name) ++ stageInfo.stage.rdd.dependencies.map{_.rdd.name} + !names.intersect(rddNames).isEmpty && names.intersect(excludedNames).isEmpty + } + + class SaveStageInfo extends SparkListener { + val stageInfos = mutable.Buffer[StageInfo]() + def onStageCompleted(stage: StageCompleted) { + stageInfos += stage.stageInfo + } + } + +}