From 665e71d14debb8a7fc1547c614867a8c3b1f806a Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@apache.org>
Date: Thu, 28 Aug 2014 19:00:40 -0700
Subject: [PATCH] [SPARK-1912] Lazily initialize buffers for local shuffle
 blocks.

This is a simplified fix for SPARK-1912.

Author: Reynold Xin <rxin@apache.org>

Closes #2179 from rxin/SPARK-1912 and squashes the following commits:

b2f0e9e [Reynold Xin] Fix unit tests.
a8eddfe [Reynold Xin] [SPARK-1912] Lazily initialize buffers for local shuffle blocks.
---
 .../spark/storage/BlockFetcherIterator.scala  |  5 +---
 .../apache/spark/storage/BlockManager.scala   | 22 ++-------------
 .../storage/BlockFetcherIteratorSuite.scala   | 28 +++++++++++--------
 3 files changed, 20 insertions(+), 35 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 4ab8ec8f0f..d07e6a1b18 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -196,11 +196,8 @@ object BlockFetcherIterator {
       // any memory that might exceed our maxBytesInFlight
       for (id <- localBlocksToFetch) {
         try {
-          // getLocalFromDisk never return None but throws BlockException
-          val iter = getLocalFromDisk(id, serializer).get
-          // Pass 0 as size since it's not in flight
           readMetrics.localBlocksFetched += 1
-          results.put(new FetchResult(id, 0, () => iter))
+          results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get))
           logDebug("Got local block " + id)
         } catch {
           case e: Exception => {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1eb622c12a..cfe5b6c50a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1039,26 +1039,8 @@ private[spark] class BlockManager(
       bytes: ByteBuffer,
       serializer: Serializer = defaultSerializer): Iterator[Any] = {
     bytes.rewind()
-
-    def getIterator: Iterator[Any] = {
-      val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
-      serializer.newInstance().deserializeStream(stream).asIterator
-    }
-
-    if (blockId.isShuffle) {
-      /* Reducer may need to read many local shuffle blocks and will wrap them into Iterators
-       * at the beginning. The wrapping will cost some memory (compression instance
-       * initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
-       * wrapping lazily to save memory. */
-      class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
-        lazy val proxy = f
-        override def hasNext: Boolean = proxy.hasNext
-        override def next(): Any = proxy.next()
-      }
-      new LazyProxyIterator(getIterator)
-    } else {
-      getIterator
-    }
+    val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
+    serializer.newInstance().deserializeStream(stream).asIterator
   }
 
   def stop(): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
index 1591284383..fbfcb5156d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
@@ -76,20 +76,24 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
 
     iterator.initialize()
 
-    // 3rd getLocalFromDisk invocation should be failed
-    verify(blockManager, times(3)).getLocalFromDisk(any(), any())
+    // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
+    verify(blockManager, times(0)).getLocalFromDisk(any(), any())
 
     assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
     // the 2nd element of the tuple returned by iterator.next should be defined when fetching successfully
-    assert(iterator.next._2.isDefined, "1st element should be defined but is not actually defined") 
+    assert(iterator.next()._2.isDefined, "1st element should be defined but is not actually defined")
+    verify(blockManager, times(1)).getLocalFromDisk(any(), any())
+
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 1 element")
-    assert(iterator.next._2.isDefined, "2nd element should be defined but is not actually defined") 
+    assert(iterator.next()._2.isDefined, "2nd element should be defined but is not actually defined")
+    verify(blockManager, times(2)).getLocalFromDisk(any(), any())
+
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 2 elements")
     // 3rd fetch should be failed
-    assert(!iterator.next._2.isDefined, "3rd element should not be defined but is actually defined") 
-    assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
-    // Don't call next() after fetching non-defined element even if thare are rest of elements in the iterator.
-    // Otherwise, BasicBlockFetcherIterator hangs up.
+    intercept[Exception] {
+      iterator.next()
+    }
+    verify(blockManager, times(3)).getLocalFromDisk(any(), any())
   }
 
 
@@ -127,8 +131,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
 
     iterator.initialize()
 
-    // getLocalFromDis should be invoked for all of 5 blocks
-    verify(blockManager, times(5)).getLocalFromDisk(any(), any())
+    // Without exhausting the iterator, the iterator should be lazy and not call getLocalFromDisk.
+    verify(blockManager, times(0)).getLocalFromDisk(any(), any())
 
     assert(iterator.hasNext, "iterator should have 5 elements but actually has no elements")
     assert(iterator.next._2.isDefined, "All elements should be defined but 1st element is not actually defined") 
@@ -139,7 +143,9 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 3 elements")
     assert(iterator.next._2.isDefined, "All elements should be defined but 4th element is not actually defined") 
     assert(iterator.hasNext, "iterator should have 5 elements but actually has 4 elements")
-    assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined") 
+    assert(iterator.next._2.isDefined, "All elements should be defined but 5th element is not actually defined")
+
+    verify(blockManager, times(5)).getLocalFromDisk(any(), any())
   }
 
   test("block fetch from remote fails using BasicBlockFetcherIterator") {
-- 
GitLab