diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 45fc8cbf7e21707a5cb3badfd1fd2056beb510db..53b0389c3a67373394e8a30a9a12a2401718c45a 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import spark.storage.{DelegateBlockFetchTracker, BlockManagerId} -import util.{CleanupIterator, TimedIterator} +import util.{CompletionIterator, TimedIterator} private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = { @@ -51,7 +51,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin val blockFetcherItr = blockManager.getMultiple(blocksByAddress) val itr = new TimedIterator(blockFetcherItr.flatMap(unpackBlock)) with DelegateBlockFetchTracker itr.setDelegate(blockFetcherItr) - CleanupIterator[(K,V), Iterator[(K,V)]](itr, { + CompletionIterator[(K,V), Iterator[(K,V)]](itr, { val shuffleMetrics = new ShuffleReadMetrics shuffleMetrics.shuffleReadMillis = itr.getNetMillis shuffleMetrics.remoteFetchTime = itr.remoteFetchTime diff --git a/core/src/main/scala/spark/util/CleanupIterator.scala b/core/src/main/scala/spark/util/CleanupIterator.scala deleted file mode 100644 index d2093c023011b6dfe9e84b982c886a9fcd237a0e..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/util/CleanupIterator.scala +++ /dev/null @@ -1,25 +0,0 @@ -package spark.util - -/** - * Wrapper around an iterator which calls a cleanup method when its finished iterating through its elements - */ -abstract class CleanupIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{ - def next = sub.next - def hasNext = { - val r = sub.hasNext - if (!r) { - cleanup - } - r - } - - def cleanup -} - -object CleanupIterator { - def apply[A, I <: Iterator[A]](sub: I, cleanupFunction: => Unit) : CleanupIterator[A,I] = { - new CleanupIterator[A,I](sub) { - def cleanup = cleanupFunction - } - } -} \ No newline at end of file diff --git a/core/src/main/scala/spark/util/CompletionIterator.scala b/core/src/main/scala/spark/util/CompletionIterator.scala new file mode 100644 index 0000000000000000000000000000000000000000..81391837805967141fcadf678e414a0dea7c7db6 --- /dev/null +++ b/core/src/main/scala/spark/util/CompletionIterator.scala @@ -0,0 +1,25 @@ +package spark.util + +/** + * Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements + */ +abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{ + def next = sub.next + def hasNext = { + val r = sub.hasNext + if (!r) { + completion + } + r + } + + def completion() +} + +object CompletionIterator { + def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = { + new CompletionIterator[A,I](sub) { + def completion() = completionFunction + } + } +} \ No newline at end of file