Skip to content
Snippets Groups Projects
Commit 540be6b1 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Modified version of the fix which just removes all per-record tracking.

parent 224fbac0
No related branches found
No related tags found
No related merge requests found
...@@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer ...@@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import spark.storage.BlockManagerId import spark.storage.BlockManagerId
import util.{NoOpTimedIterator, SystemTimedIterator, CompletionIterator, TimedIterator} import util.CompletionIterator
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = {
...@@ -49,14 +49,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin ...@@ -49,14 +49,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
} }
val blockFetcherItr = blockManager.getMultiple(blocksByAddress) val blockFetcherItr = blockManager.getMultiple(blocksByAddress)
val itr = if (System.getProperty("per.record.shuffle.metrics", "false").toBoolean) { val itr = blockFetcherItr.flatMap(unpackBlock)
new SystemTimedIterator(blockFetcherItr.flatMap(unpackBlock))
} else {
new NoOpTimedIterator(blockFetcherItr.flatMap(unpackBlock))
}
CompletionIterator[(K,V), Iterator[(K,V)]](itr, { CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
val shuffleMetrics = new ShuffleReadMetrics val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleReadMillis = itr.getNetMillis
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
......
...@@ -48,11 +48,6 @@ class ShuffleReadMetrics extends Serializable { ...@@ -48,11 +48,6 @@ class ShuffleReadMetrics extends Serializable {
*/ */
var localBlocksFetched: Int = _ var localBlocksFetched: Int = _
/**
* Total time to read shuffle data
*/
var shuffleReadMillis: Option[Long] = _
/** /**
* Total time that is spent blocked waiting for shuffle to fetch data * Total time that is spent blocked waiting for shuffle to fetch data
*/ */
......
package spark.util
/**
* A utility for tracking the the time an iterator takes to iterate through its elements.
*/
trait TimedIterator {
def getNetMillis: Option[Long]
def getAverageTimePerItem: Option[Double]
}
/**
* A TimedIterator which uses System.currentTimeMillis() on every call to next().
*
* In general, this should only be used if you expect it to take a considerable amount of time
* (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate,
* and you are probably just adding more overhead
*/
class SystemTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator {
private var netMillis = 0l
private var nElems = 0
def hasNext = {
val start = System.currentTimeMillis()
val r = sub.hasNext
val end = System.currentTimeMillis()
netMillis += (end - start)
r
}
def next = {
val start = System.currentTimeMillis()
val r = sub.next
val end = System.currentTimeMillis()
netMillis += (end - start)
nElems += 1
r
}
def getNetMillis = Some(netMillis)
def getAverageTimePerItem = Some(netMillis / nElems.toDouble)
}
/**
* A TimedIterator which doesn't perform any timing measurements.
*/
class NoOpTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator {
def hasNext = sub.hasNext
def next = sub.next
def getNetMillis = None
def getAverageTimePerItem = None
}
...@@ -57,7 +57,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc ...@@ -57,7 +57,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
taskMetrics.shuffleReadMetrics should be ('defined) taskMetrics.shuffleReadMetrics should be ('defined)
val sm = taskMetrics.shuffleReadMetrics.get val sm = taskMetrics.shuffleReadMetrics.get
sm.totalBlocksFetched should be > (0) sm.totalBlocksFetched should be > (0)
sm.shuffleReadMillis.get should be > (0l)
sm.localBlocksFetched should be > (0) sm.localBlocksFetched should be > (0)
sm.remoteBlocksFetched should be (0) sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l) sm.remoteBytesRead should be (0l)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment