Skip to content
Snippets Groups Projects
Commit d4df4749 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #115 from aarondav/shuffle-fix

Eliminate extra memory usage when shuffle file consolidation is disabled

Otherwise, we see SPARK-946 even when shuffle file consolidation is disabled.
Fixing SPARK-946 is still forthcoming.
parents e018f2d0 4261e834
No related branches found
No related tags found
No related merge requests found
...@@ -523,7 +523,9 @@ private[spark] class BlockManager( ...@@ -523,7 +523,9 @@ private[spark] class BlockManager(
val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true) val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => { writer.registerCloseEventHandler(() => {
diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) if (shuffleBlockManager.consolidateShuffleFiles) {
diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
}
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
blockInfo.put(blockId, myInfo) blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length) myInfo.markReady(writer.fileSegment().length)
......
...@@ -63,8 +63,12 @@ class ShuffleBlockManager(blockManager: BlockManager) { ...@@ -63,8 +63,12 @@ class ShuffleBlockManager(blockManager: BlockManager) {
val fileId = getUnusedFileId() val fileId = getUnusedFileId()
val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val filename = physicalFileName(shuffleId, bucketId, fileId) if (consolidateShuffleFiles) {
blockManager.getDiskWriter(blockId, filename, serializer, bufferSize) val filename = physicalFileName(shuffleId, bucketId, fileId)
blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
} else {
blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize)
}
} }
new ShuffleWriterGroup(mapId, fileId, writers) new ShuffleWriterGroup(mapId, fileId, writers)
} }
...@@ -81,8 +85,9 @@ class ShuffleBlockManager(blockManager: BlockManager) { ...@@ -81,8 +85,9 @@ class ShuffleBlockManager(blockManager: BlockManager) {
} }
private def recycleFileId(fileId: Int) { private def recycleFileId(fileId: Int) {
if (!consolidateShuffleFiles) { return } // ensures we always generate new file id if (consolidateShuffleFiles) {
unusedFileIds.add(fileId) unusedFileIds.add(fileId)
}
} }
private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment