diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index c27ed3640611948743dee57ad469dc851b3c8b75..2987dbbe5870e1fcde3b6be6882a6b49bc5e34d4 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -4,8 +4,8 @@ import executor.{ShuffleReadMetrics, TaskMetrics} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import spark.storage.{DelegateBlockFetchTracker, BlockManagerId} -import util.{CompletionIterator, TimedIterator} +import spark.storage.BlockManagerId +import spark.util.CompletionIterator private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { @@ -49,17 +49,15 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } val blockFetcherItr = blockManager.getMultiple(blocksByAddress) - val itr = new TimedIterator(blockFetcherItr.flatMap(unpackBlock)) with DelegateBlockFetchTracker - itr.setDelegate(blockFetcherItr) + val itr = blockFetcherItr.flatMap(unpackBlock) CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics - shuffleMetrics.shuffleReadMillis = itr.getNetMillis - shuffleMetrics.remoteFetchTime = itr.remoteFetchTime - shuffleMetrics.fetchWaitTime = itr.fetchWaitTime - shuffleMetrics.remoteBytesRead = itr.remoteBytesRead - shuffleMetrics.totalBlocksFetched = itr.totalBlocks - shuffleMetrics.localBlocksFetched = itr.numLocalBlocks - shuffleMetrics.remoteBlocksFetched = itr.numRemoteBlocks + shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime + shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime + shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead + shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks + shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks + shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks metrics.shuffleReadMetrics = Some(shuffleMetrics) }) } diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 93bbb6b4587a5b6773ad9597e0b767c0e0531688..a7c56c237199b2cb98366219282d02bcc521389b 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -48,11 +48,6 @@ class ShuffleReadMetrics extends Serializable { */ var localBlocksFetched: Int = _ - /** - * Total time to read shuffle data - */ - var shuffleReadMillis: Long = _ - /** * Total time that is spent blocked waiting for shuffle to fetch data */ diff --git a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala deleted file mode 100644 index f6c28dce52ad554bdc06d20c1c0a2ad53d990018..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala +++ /dev/null @@ -1,12 +0,0 @@ -package spark.storage - -private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker { - var delegate : BlockFetchTracker = _ - def setDelegate(d: BlockFetchTracker) {delegate = d} - def totalBlocks = delegate.totalBlocks - def numLocalBlocks = delegate.numLocalBlocks - def numRemoteBlocks = delegate.numRemoteBlocks - def remoteFetchTime = delegate.remoteFetchTime - def fetchWaitTime = delegate.fetchWaitTime - def remoteBytesRead = delegate.remoteBytesRead -} diff --git a/core/src/main/scala/spark/util/TimedIterator.scala b/core/src/main/scala/spark/util/TimedIterator.scala deleted file mode 100644 index 539b01f4ce47d3ff7237ca619d220aded7b04ee1..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/util/TimedIterator.scala +++ /dev/null @@ -1,32 +0,0 @@ -package spark.util - -/** - * A utility for tracking the total time an iterator takes to iterate through its elements. - * - * In general, this should only be used if you expect it to take a considerable amount of time - * (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate, - * and you are probably just adding more overhead - */ -class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] { - private var netMillis = 0l - private var nElems = 0 - def hasNext = { - val start = System.currentTimeMillis() - val r = sub.hasNext - val end = System.currentTimeMillis() - netMillis += (end - start) - r - } - def next = { - val start = System.currentTimeMillis() - val r = sub.next - val end = System.currentTimeMillis() - netMillis += (end - start) - nElems += 1 - r - } - - def getNetMillis = netMillis - def getAverageTimePerItem = netMillis / nElems.toDouble - -} diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index 2f5af10e69c7f2293ee7e071893df883330560b7..42a87d8b90fe57c111d4d438c8f033207d0fedbd 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -57,7 +57,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get sm.totalBlocksFetched should be > (0) - sm.shuffleReadMillis should be > (0l) sm.localBlocksFetched should be > (0) sm.remoteBlocksFetched should be (0) sm.remoteBytesRead should be (0l)