diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index fb73636162af960846e37975469ade711c9d946c..3d9b09ec33e2af61a8fc81cb6d7798b2ef6f3ab4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -358,7 +358,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize) - val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) + + val shouldCompress = blockManager.shouldCompress(blockId) + val compressionCodec = new LZFCompressionCodec(sparkConf) + val compressedStream = + if (shouldCompress) { + compressionCodec.compressedInputStream(bufferedStream) + } else { + bufferedStream + } var deserializeStream = ser.deserializeStream(compressedStream) var objectsRead = 0