diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6e450081dcb11caf0b25bcabe7b038ae1c47bf3b..a41286d3e4a009188f881dd696a2e47205803ffb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1015,8 +1015,26 @@ private[spark] class BlockManager( bytes: ByteBuffer, serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() - val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) - serializer.newInstance().deserializeStream(stream).asIterator + + def getIterator = { + val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) + serializer.newInstance().deserializeStream(stream).asIterator + } + + if (blockId.isShuffle) { + // Reducer may need to read many local shuffle blocks and will wrap them into Iterators + // at the beginning. The wrapping will cost some memory (compression instance + // initialization, etc.). Reducer read shuffle blocks one by one so we could do the + // wrapping lazily to save memory. + class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { + lazy val proxy = f + override def hasNext: Boolean = proxy.hasNext + override def next(): Any = proxy.next() + } + new LazyProxyIterator(getIterator) + } else { + getIterator + } } def stop() {