diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e6329cbd47576553b8fee871b639d550b79c8c87..dbe573dc64301ccbabc77028b311f079e8da18d0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -523,7 +523,9 @@ private[spark] class BlockManager( val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true) val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) writer.registerCloseEventHandler(() => { - diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) + if (shuffleBlockManager.consolidateShuffleFiles) { + diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) + } val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) blockInfo.put(blockId, myInfo) myInfo.markReady(writer.fileSegment().length) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 229178c09512ba25f6ca6c22cde1246de7d7761c..066e45a12b8c7a8e9784a42eba63c373e1b44378 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -63,8 +63,12 @@ class ShuffleBlockManager(blockManager: BlockManager) { val fileId = getUnusedFileId() val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - val filename = physicalFileName(shuffleId, bucketId, fileId) - blockManager.getDiskWriter(blockId, filename, serializer, bufferSize) + if (consolidateShuffleFiles) { + val filename = physicalFileName(shuffleId, bucketId, fileId) + blockManager.getDiskWriter(blockId, filename, serializer, bufferSize) + } else { + blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize) + } } new ShuffleWriterGroup(mapId, fileId, writers) } @@ -81,8 +85,9 @@ class ShuffleBlockManager(blockManager: BlockManager) { } private def recycleFileId(fileId: Int) { - if (!consolidateShuffleFiles) { return } // ensures we always generate new file id - unusedFileIds.add(fileId) + if (consolidateShuffleFiles) { + unusedFileIds.add(fileId) + } } private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {