diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 4ab8ec8f0ff3b53814180b0575b29eb151c6a8ac..d07e6a1b1836cad3f0c1dc6d8ab92894170e3306 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -196,11 +196,8 @@ object BlockFetcherIterator { // any memory that might exceed our maxBytesInFlight for (id <- localBlocksToFetch) { try { - // getLocalFromDisk never return None but throws BlockException - val iter = getLocalFromDisk(id, serializer).get - // Pass 0 as size since it's not in flight readMetrics.localBlocksFetched += 1 - results.put(new FetchResult(id, 0, () => iter)) + results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get)) logDebug("Got local block " + id) } catch { case e: Exception => { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1eb622c12a79f8ee19c00c4be9cb91c2249c85dc..cfe5b6c50aea27738be457a5973194a2434fe27e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1039,26 +1039,8 @@ private[spark] class BlockManager( bytes: ByteBuffer, serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() - - def getIterator: Iterator[Any] = { - val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) - serializer.newInstance().deserializeStream(stream).asIterator - } - - if (blockId.isShuffle) { - /* Reducer may need to read many local shuffle blocks and will wrap them into Iterators - * at the beginning. The wrapping will cost some memory (compression instance - * initialization, etc.). Reducer reads shuffle blocks one by one so we could do the - * wrapping lazily to save memory. */ - class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { - lazy val proxy = f - override def hasNext: Boolean = proxy.hasNext - override def next(): Any = proxy.next() - } - new LazyProxyIterator(getIterator) - } else { - getIterator - } + val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) + serializer.newInstance().deserializeStream(stream).asIterator } def stop(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala index 159128438390dd72bd899f6e29746b488aed4a5d..fbfcb5156d496603524baafac77ae2750bfd7208 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala @@ -76,20 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { iterator.initialize() - // 3rd getLocalFromDisk invocation should be failed - verify(blockManager, times(3)).getLocalFromDisk(any(), any()) + // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk. + verify(blockManager, times(0)).getLocalFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") // the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully - assert(iterator.next._2.isDefined, "1st element should be defined but is not actually defined") + assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually defined") + verify(blockManager, times(1)).getLocalFromDisk(any(), any()) + assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element") - assert(iterator.next._2.isDefined, "2nd element should be defined but is not actually defined") + assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually defined") + verify(blockManager, times(2)).getLocalFromDisk(any(), any()) + assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements") // 3rd fetch should be failed - assert(!iterator.next._2.isDefined, "3rd element should not be defined but is actually defined") - assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements") - // Don't call next() after fetching non-defined element even if thare are rest of elements in the iterator. - // Otherwise, BasicBlockFetcherIterator hangs up. + intercept[Exception] { + iterator.next() + } + verify(blockManager, times(3)).getLocalFromDisk(any(), any()) } @@ -127,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { iterator.initialize() - // getLocalFromDis should be invoked for all of 5 blocks - verify(blockManager, times(5)).getLocalFromDisk(any(), any()) + // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk. + verify(blockManager, times(0)).getLocalFromDisk(any(), any()) assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements") assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined") @@ -139,7 +143,9 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements") assert(iterator.next._2.isDefined, "All elements should be defined but 4th element is not actually defined") assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements") - assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined") + assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined") + + verify(blockManager, times(5)).getLocalFromDisk(any(), any()) } test("block fetch from remote fails using BasicBlockFetcherIterator") {