diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8df8b4f83ee7d24c1accb6196c214a89f7cbb55f..792f29de60993fcbcb597d91e7aabf40942ab508 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -20,14 +20,15 @@ package org.apache.spark.util.collection import java.io._ import java.util.Comparator -import it.unimi.dsi.fastutil.io.FastBufferedInputStream - import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import it.unimi.dsi.fastutil.io.FastBufferedInputStream + import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer} -import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter} +import org.apache.spark.io.LZFCompressionCodec +import org.apache.spark.serializer.{KryoDeserializationStream, Serializer} +import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter} /** * An append-only map that spills sorted content to disk when there is insufficient space for it @@ -153,9 +154,38 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) val (blockId, file) = diskBlockManager.createTempBlock() - val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _) + /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach + * closes and re-opens serialization and compression streams within each file. This makes some + * assumptions about the way that serialization and compression streams work, specifically: + * + * 1) The serializer input streams do not pre-fetch data from the underlying stream. + * + * 2) Several compression streams can be opened, written to, and flushed on the write path + * while only one compression input stream is created on the read path + * + * In practice (1) is only true for Java, so we add a special fix below to make it work for + * Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF. + * + * To avoid making these assumptions we should create an intermediate stream that batches + * objects and sends an EOF to the higher layer streams to make sure they never prefetch data. + * This is a bit tricky because, within each segment, you'd need to track the total number + * of bytes written and then re-wind and write it at the beginning of the segment. This will + * most likely require using the file channel API. + */ + + val codec = new LZFCompressionCodec(sparkConf) + + def wrapForCompression(outputStream: OutputStream) = { + blockManager.shouldCompress(blockId) match { + case true => + codec.compressedOutputStream(outputStream) + case false => + outputStream + } + } + def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, - compressStream, syncWrites) + wrapForCompression, syncWrites) var writer = getNewWriter var objectsWritten = 0 diff --git a/docs/configuration.md b/docs/configuration.md index 4c2e9cc4798d41ef5225800662524d5cf9058b3c..be548e372dcd47ba677dbdccc762cba4a15b54e0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful <td>spark.shuffle.spill.compress</td> <td>true</td> <td> - Whether to compress data spilled during shuffles. + Whether to compress data spilled during shuffles. If enabled, spill compression + always uses the `org.apache.spark.io.LZFCompressionCodec` codec, + regardless of the value of `spark.io.compression.codec`. </td> </tr> <tr>