diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 83c22b1f144c6d6d6c277c2c09855e9151e27507..ce61d27448b374173c3b68737cd3627f1f081022 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import spark.storage.BlockManagerId -import util.{NoOpTimedIterator, SystemTimedIterator, CompletionIterator, TimedIterator} +import util.CompletionIterator private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { @@ -49,14 +49,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } val blockFetcherItr = blockManager.getMultiple(blocksByAddress) - val itr = if (System.getProperty("per.record.shuffle.metrics", "false").toBoolean) { - new SystemTimedIterator(blockFetcherItr.flatMap(unpackBlock)) - } else { - new NoOpTimedIterator(blockFetcherItr.flatMap(unpackBlock)) - } + val itr = blockFetcherItr.flatMap(unpackBlock) CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics - shuffleMetrics.shuffleReadMillis = itr.getNetMillis shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 45f6d43971119e5275d7c8d848ab15dba262e559..a7c56c237199b2cb98366219282d02bcc521389b 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -48,11 +48,6 @@ class ShuffleReadMetrics extends Serializable { */ var localBlocksFetched: Int = _ - /** - * Total time to read shuffle data - */ - var shuffleReadMillis: Option[Long] = _ - /** * Total time that is spent blocked waiting for shuffle to fetch data */ diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala deleted file mode 100644 index 49f1276b4e19382adff96cb47d5fa48512ee5e8a..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/util/TimedIterator.scala +++ /dev/null @@ -1,50 +0,0 @@ -package spark.util - -/** - * A utility for tracking the the time an iterator takes to iterate through its elements. - */ -trait TimedIterator { - def getNetMillis: Option[Long] - def getAverageTimePerItem: Option[Double] -} - -/** - * A TimedIterator which uses System.currentTimeMillis() on every call to next(). - * - * In general, this should only be used if you expect it to take a considerable amount of time - * (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate, - * and you are probably just adding more overhead - */ -class SystemTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { - private var netMillis = 0l - private var nElems = 0 - def hasNext = { - val start = System.currentTimeMillis() - val r = sub.hasNext - val end = System.currentTimeMillis() - netMillis += (end - start) - r - } - def next = { - val start = System.currentTimeMillis() - val r = sub.next - val end = System.currentTimeMillis() - netMillis += (end - start) - nElems += 1 - r - } - - def getNetMillis = Some(netMillis) - def getAverageTimePerItem = Some(netMillis / nElems.toDouble) - -} - -/** - * A TimedIterator which doesn't perform any timing measurements. - */ -class NoOpTimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] with TimedIterator { - def hasNext = sub.hasNext - def next = sub.next - def getNetMillis = None - def getAverageTimePerItem = None -} diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index 5ccab369db214f053b96c3357aca9014b2c078d1..42a87d8b90fe57c111d4d438c8f033207d0fedbd 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -57,7 +57,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get sm.totalBlocksFetched should be > (0) - sm.shuffleReadMillis.get should be > (0l) sm.localBlocksFetched should be > (0) sm.remoteBlocksFetched should be (0) sm.remoteBytesRead should be (0l)