Skip to content
Snippets Groups Projects
Commit b55cade8 authored by Kay Ousterhout's avatar Kay Ousterhout
Browse files

Remove the remoteFetchTime metric.

This metric is confusing: it adds up all of the time to fetch
shuffle inputs, but fetches often happen in parallel, so
remoteFetchTime can be much longer than the task execution time.

@squito it looks like you added this metric -- do you have a use case for it?

cc @shivaram -- I know you've looked at the shuffle performance a lot so chime in here if this metric has turned out to be useful for you!

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #62 from kayousterhout/remove_fetch_variable and squashes the following commits:

43341eb [Kay Ousterhout] Remote the remoteFetchTime metric.
parent 9d225a91
No related branches found
No related tags found
No related merge requests found
...@@ -79,7 +79,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin ...@@ -79,7 +79,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val completionIter = CompletionIterator[T, Iterator[T]](itr, { val completionIter = CompletionIterator[T, Iterator[T]](itr, {
val shuffleMetrics = new ShuffleReadMetrics val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
......
...@@ -103,13 +103,6 @@ class ShuffleReadMetrics extends Serializable { ...@@ -103,13 +103,6 @@ class ShuffleReadMetrics extends Serializable {
*/ */
var fetchWaitTime: Long = _ var fetchWaitTime: Long = _
/**
* Total time spent fetching remote shuffle blocks. This aggregates the time spent fetching all
* input blocks. Since block fetches are both pipelined and parallelized, this can
* exceed fetchWaitTime and executorRunTime.
*/
var remoteFetchTime: Long = _
/** /**
* Total number of remote bytes read from the shuffle by this task * Total number of remote bytes read from the shuffle by this task
*/ */
......
...@@ -275,7 +275,6 @@ class JobLogger(val user: String, val logDirName: String) ...@@ -275,7 +275,6 @@ class JobLogger(val user: String, val logDirName: String)
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
" REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
case None => "" case None => ""
} }
......
...@@ -49,7 +49,6 @@ trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] wi ...@@ -49,7 +49,6 @@ trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] wi
def totalBlocks: Int def totalBlocks: Int
def numLocalBlocks: Int def numLocalBlocks: Int
def numRemoteBlocks: Int def numRemoteBlocks: Int
def remoteFetchTime: Long
def fetchWaitTime: Long def fetchWaitTime: Long
def remoteBytesRead: Long def remoteBytesRead: Long
} }
...@@ -79,7 +78,6 @@ object BlockFetcherIterator { ...@@ -79,7 +78,6 @@ object BlockFetcherIterator {
import blockManager._ import blockManager._
private var _remoteBytesRead = 0L private var _remoteBytesRead = 0L
private var _remoteFetchTime = 0L
private var _fetchWaitTime = 0L private var _fetchWaitTime = 0L
if (blocksByAddress == null) { if (blocksByAddress == null) {
...@@ -125,7 +123,6 @@ object BlockFetcherIterator { ...@@ -125,7 +123,6 @@ object BlockFetcherIterator {
future.onSuccess { future.onSuccess {
case Some(message) => { case Some(message) => {
val fetchDone = System.currentTimeMillis() val fetchDone = System.currentTimeMillis()
_remoteFetchTime += fetchDone - fetchStart
val bufferMessage = message.asInstanceOf[BufferMessage] val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
for (blockMessage <- blockMessageArray) { for (blockMessage <- blockMessageArray) {
...@@ -241,7 +238,6 @@ object BlockFetcherIterator { ...@@ -241,7 +238,6 @@ object BlockFetcherIterator {
override def totalBlocks: Int = numLocal + numRemote override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote override def numRemoteBlocks: Int = numRemote
override def remoteFetchTime: Long = _remoteFetchTime
override def fetchWaitTime: Long = _fetchWaitTime override def fetchWaitTime: Long = _fetchWaitTime
override def remoteBytesRead: Long = _remoteBytesRead override def remoteBytesRead: Long = _remoteBytesRead
......
...@@ -129,7 +129,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc ...@@ -129,7 +129,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
sm.localBlocksFetched should be > (0) sm.localBlocksFetched should be > (0)
sm.remoteBlocksFetched should be (0) sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l) sm.remoteBytesRead should be (0l)
sm.remoteFetchTime should be (0l)
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment