Skip to content
Snippets Groups Projects
Commit c324ac10 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Force use of LZF when spilling data

parent 1b299142
No related branches found
No related tags found
No related merge requests found
...@@ -20,14 +20,15 @@ package org.apache.spark.util.collection ...@@ -20,14 +20,15 @@ package org.apache.spark.util.collection
import java.io._ import java.io._
import java.util.Comparator import java.util.Comparator
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer} import org.apache.spark.io.LZFCompressionCodec
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter} import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
/** /**
* An append-only map that spills sorted content to disk when there is insufficient space for it * An append-only map that spills sorted content to disk when there is insufficient space for it
...@@ -153,9 +154,38 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -153,9 +154,38 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock() val (blockId, file) = diskBlockManager.createTempBlock()
val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _) /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach
* closes and re-opens serialization and compression streams within each file. This makes some
* assumptions about the way that serialization and compression streams work, specifically:
*
* 1) The serializer input streams do not pre-fetch data from the underlying stream.
*
* 2) Several compression streams can be opened, written to, and flushed on the write path
* while only one compression input stream is created on the read path
*
* In practice (1) is only true for Java, so we add a special fix below to make it work for
* Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF.
*
* To avoid making these assumptions we should create an intermediate stream that batches
* objects and sends an EOF to the higher layer streams to make sure they never prefetch data.
* This is a bit tricky because, within each segment, you'd need to track the total number
* of bytes written and then re-wind and write it at the beginning of the segment. This will
* most likely require using the file channel API.
*/
val codec = new LZFCompressionCodec(sparkConf)
def wrapForCompression(outputStream: OutputStream) = {
blockManager.shouldCompress(blockId) match {
case true =>
codec.compressedOutputStream(outputStream)
case false =>
outputStream
}
}
def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
compressStream, syncWrites) wrapForCompression, syncWrites)
var writer = getNewWriter var writer = getNewWriter
var objectsWritten = 0 var objectsWritten = 0
......
...@@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful ...@@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.shuffle.spill.compress</td> <td>spark.shuffle.spill.compress</td>
<td>true</td> <td>true</td>
<td> <td>
Whether to compress data spilled during shuffles. Whether to compress data spilled during shuffles. If enabled, spill compression
always uses the `org.apache.spark.io.LZFCompressionCodec` codec,
regardless of the value of `spark.io.compression.codec`.
</td> </td>
</tr> </tr>
<tr> <tr>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment