diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
new file mode 100644
index 0000000000000000000000000000000000000000..dbe0bda61589ca6a2d8e138113e48e8acfdaa392
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ConcurrentHashMap
+
+private[storage] trait BlockInfo {
+  def level: StorageLevel
+  def tellMaster: Boolean
+  // To save space, 'pending' and 'failed' are encoded as special sizes:
+  @volatile var size: Long = BlockInfo.BLOCK_PENDING
+  private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
+  private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
+  private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)
+
+  setInitThread()
+
+  private def setInitThread() {
+    // Set current thread as init thread - waitForReady will not block this thread
+    // (in case there is non trivial initialization which ends up calling waitForReady as part of
+    // initialization itself)
+    BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
+  }
+
+  /**
+   * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
+   * Return true if the block is available, false otherwise.
+   */
+  def waitForReady(): Boolean = {
+    if (pending && initThread != Thread.currentThread()) {
+      synchronized {
+        while (pending) this.wait()
+      }
+    }
+    !failed
+  }
+
+  /** Mark this BlockInfo as ready (i.e. block is finished writing) */
+  def markReady(sizeInBytes: Long) {
+    require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
+    assert (pending)
+    size = sizeInBytes
+    BlockInfo.blockInfoInitThreads.remove(this)
+    synchronized {
+      this.notifyAll()
+    }
+  }
+
+  /** Mark this BlockInfo as ready but failed */
+  def markFailure() {
+    assert (pending)
+    size = BlockInfo.BLOCK_FAILED
+    BlockInfo.blockInfoInitThreads.remove(this)
+    synchronized {
+      this.notifyAll()
+    }
+  }
+}
+
+private object BlockInfo {
+  // initThread is logically a BlockInfo field, but we store it here because
+  // it's only needed while this block is in the 'pending' state and we want
+  // to minimize BlockInfo's memory footprint.
+  private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
+
+  private val BLOCK_PENDING: Long = -1L
+  private val BLOCK_FAILED: Long = -2L
+}
+
+// All shuffle blocks have the same `level` and `tellMaster` properties,
+// so we can save space by not storing them in each instance:
+private[storage] class ShuffleBlockInfo extends BlockInfo {
+  // These need to be defined using 'def' instead of 'val' in order for
+  // the compiler to eliminate the fields:
+  def level: StorageLevel = StorageLevel.DISK_ONLY
+  def tellMaster: Boolean = false
+}
+
+private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean)
+  extends BlockInfo {
+  // Intentionally left blank
+}
\ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 11cda5de556a0b5286db25d8d3c8e35f7fff49ac..76d537f8e838a75ec9f6296e143e13011a46408d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,7 +19,6 @@ package org.apache.spark.storage
 
 import java.io.{InputStream, OutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
-import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable.{HashMap, ArrayBuffer}
 import scala.util.Random
@@ -47,80 +46,6 @@ private[spark] class BlockManager(
     maxMemory: Long)
   extends Logging {
 
-
-  // initThread is logically a BlockInfo field, but we store it here because
-  // it's only needed while this block is in the 'pending' state and we want
-  // to minimize BlockInfo's memory footprint.
-  private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
-
-  private val BLOCK_PENDING: Long = -1L
-  private val BLOCK_FAILED: Long = -2L
-
-  private trait BlockInfo {
-    def level: StorageLevel
-    def tellMaster: Boolean
-    @volatile var size: Long = BLOCK_PENDING  // also encodes 'pending' and 'failed' to save space
-    private def pending: Boolean = size == BLOCK_PENDING
-    private def failed: Boolean = size == BLOCK_FAILED
-    private def initThread: Thread = blockInfoInitThreads.get(this)
-
-    setInitThread()
-
-    private def setInitThread() {
-      // Set current thread as init thread - waitForReady will not block this thread
-      // (in case there is non trivial initialization which ends up calling waitForReady as part of
-      // initialization itself)
-      blockInfoInitThreads.put(this, Thread.currentThread())
-    }
-
-    /**
-     * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
-     * Return true if the block is available, false otherwise.
-     */
-    def waitForReady(): Boolean = {
-      if (pending && initThread != Thread.currentThread()) {
-        synchronized {
-          while (pending) this.wait()
-        }
-      }
-      !failed
-    }
-
-    /** Mark this BlockInfo as ready (i.e. block is finished writing) */
-    def markReady(sizeInBytes: Long) {
-      require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
-      assert (pending)
-      size = sizeInBytes
-      blockInfoInitThreads.remove(this)
-      synchronized {
-        this.notifyAll()
-      }
-    }
-
-    /** Mark this BlockInfo as ready but failed */
-    def markFailure() {
-      assert (pending)
-      size = BLOCK_FAILED
-      blockInfoInitThreads.remove(this)
-      synchronized {
-        this.notifyAll()
-      }
-    }
-  }
-
-  // All shuffle blocks have the same `level` and `tellMaster` properties,
-  // so we can save space by not storing them in each instance:
-  private class ShuffleBlockInfo extends BlockInfo {
-    // These need to be defined using 'def' instead of 'val' in order for
-    // the compiler to eliminate the fields:
-    def level: StorageLevel = StorageLevel.DISK_ONLY
-    def tellMaster: Boolean = false
-  }
-
-  private class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) extends BlockInfo {
-    // Intentionally left blank
-  }
-
   val shuffleBlockManager = new ShuffleBlockManager(this)
   val diskBlockManager = new DiskBlockManager(
     System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))