From 2d7cf6a271dbd494f1d351e6db7db8568733edc3 Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@apache.org>
Date: Sun, 27 Oct 2013 12:51:54 -0700
Subject: [PATCH] Restructure BlockInfo fields to reduce memory use.

---
 .../apache/spark/storage/BlockManager.scala   | 32 ++++++++++---------
 1 file changed, 17 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index dbe573dc64..285cf022f6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{InputStream, OutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable.{HashMap, ArrayBuffer}
 import scala.util.Random
@@ -46,11 +47,17 @@ private[spark] class BlockManager(
     maxMemory: Long)
   extends Logging {
 
+
+  // initThread is logically a BlockInfo field, but we store it here because
+  // it's only needed while this block is in the 'pending' state and we want
+  // to minimize BlockInfo's memory footprint.
+  private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
+
   private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
-    @volatile var pending: Boolean = true
-    @volatile var size: Long = -1L
-    @volatile var initThread: Thread = null
-    @volatile var failed = false
+    @volatile var size: Long = -1L  // also encodes 'pending' and 'failed' to save space
+    private def pending: Boolean = size == -1L
+    private def failed: Boolean = size == -2L
+    private def initThread: Thread = blockInfoInitThreads.get(this)
 
     setInitThread()
 
@@ -58,7 +65,7 @@ private[spark] class BlockManager(
       // Set current thread as init thread - waitForReady will not block this thread
       // (in case there is non trivial initialization which ends up calling waitForReady as part of
       // initialization itself)
-      this.initThread = Thread.currentThread()
+      blockInfoInitThreads.put(this, Thread.currentThread())
     }
 
     /**
@@ -66,7 +73,7 @@ private[spark] class BlockManager(
      * Return true if the block is available, false otherwise.
      */
     def waitForReady(): Boolean = {
-      if (initThread != Thread.currentThread() && pending) {
+      if (pending && initThread != Thread.currentThread()) {
         synchronized {
           while (pending) this.wait()
         }
@@ -76,12 +83,10 @@ private[spark] class BlockManager(
 
     /** Mark this BlockInfo as ready (i.e. block is finished writing) */
     def markReady(sizeInBytes: Long) {
+      require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
       assert (pending)
       size = sizeInBytes
-      initThread = null
-      failed = false
-      initThread = null
-      pending = false
+      blockInfoInitThreads.remove(this)
       synchronized {
         this.notifyAll()
       }
@@ -90,11 +95,8 @@ private[spark] class BlockManager(
     /** Mark this BlockInfo as ready but failed */
     def markFailure() {
       assert (pending)
-      size = 0
-      initThread = null
-      failed = true
-      initThread = null
-      pending = false
+      size = -2L
+      blockInfoInitThreads.remove(this)
       synchronized {
         this.notifyAll()
       }
-- 
GitLab