From 596f18479e5cb14fdc0af800f97b7b13e963fe5b Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Sat, 26 Oct 2013 22:25:24 -0700
Subject: [PATCH] Eliminate extra memory usage when shuffle file consolidation
 is disabled

Otherwise, we see SPARK-946 even when shuffle file consolidation is disabled.
Fixing SPARK-946 is still forthcoming.
---
 .../org/apache/spark/storage/BlockManager.scala     |  5 ++++-
 .../apache/spark/storage/ShuffleBlockManager.scala  | 13 +++++++++----
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e6329cbd47..c02c0327ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -523,7 +523,10 @@ private[spark] class BlockManager(
     val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending =  true)
     val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
     writer.registerCloseEventHandler(() => {
-      diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
+      if (blockId.name != filename) {
+        // Avoid producing mappings if we're using unique filenames for blocks.
+        diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
+      }
       val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
       blockInfo.put(blockId, myInfo)
       myInfo.markReady(writer.fileSegment().length)
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 229178c095..066e45a12b 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -63,8 +63,12 @@ class ShuffleBlockManager(blockManager: BlockManager) {
         val fileId = getUnusedFileId()
         val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          val filename = physicalFileName(shuffleId, bucketId, fileId)
-          blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
+          if (consolidateShuffleFiles) {
+            val filename = physicalFileName(shuffleId, bucketId, fileId)
+            blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
+          } else {
+            blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize)
+          }
         }
         new ShuffleWriterGroup(mapId, fileId, writers)
       }
@@ -81,8 +85,9 @@ class ShuffleBlockManager(blockManager: BlockManager) {
   }
 
   private def recycleFileId(fileId: Int) {
-    if (!consolidateShuffleFiles) { return } // ensures we always generate new file id
-    unusedFileIds.add(fileId)
+    if (consolidateShuffleFiles) {
+      unusedFileIds.add(fileId)
+    }
   }
 
   private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
-- 
GitLab