diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 8f867686a044330cb23f2f677911a76a9415ac21..5ddda4d6953fad24817813b4d2526a5f6792e3ab 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark
 
-import scala.collection.mutable.{ArrayBuffer, HashSet}
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.executor.InputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
 
@@ -30,7 +30,7 @@ import org.apache.spark.storage._
 private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
 
   /** Keys of RDD partitions that are being computed/loaded. */
-  private val loading = new HashSet[RDDBlockId]()
+  private val loading = new mutable.HashSet[RDDBlockId]
 
   /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
   def getOrCompute[T](
@@ -118,21 +118,29 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
   }
 
   /**
-   * Cache the values of a partition, keeping track of any updates in the storage statuses
-   * of other blocks along the way.
+   * Cache the values of a partition, keeping track of any updates in the storage statuses of
+   * other blocks along the way.
+   *
+   * The effective storage level refers to the level that actually specifies BlockManager put
+   * behavior, not the level originally specified by the user. This is mainly for forcing a
+   * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
+   * while preserving the the original semantics of the RDD as specified by the application.
    */
   private def putInBlockManager[T](
       key: BlockId,
       values: Iterator[T],
-      storageLevel: StorageLevel,
-      updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {
-
-    if (!storageLevel.useMemory) {
-      /* This RDD is not to be cached in memory, so we can just pass the computed values
-       * as an iterator directly to the BlockManager, rather than first fully unrolling
-       * it in memory. The latter option potentially uses much more memory and risks OOM
-       * exceptions that can be avoided. */
-      updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
+      level: StorageLevel,
+      updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
+      effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
+
+    val putLevel = effectiveStorageLevel.getOrElse(level)
+    if (!putLevel.useMemory) {
+      /*
+       * This RDD is not to be cached in memory, so we can just pass the computed values as an
+       * iterator directly to the BlockManager rather than first fully unrolling it in memory.
+       */
+      updatedBlocks ++=
+        blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
       blockManager.get(key) match {
         case Some(v) => v.data.asInstanceOf[Iterator[T]]
         case None =>
@@ -140,14 +148,36 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
           throw new BlockException(key, s"Block manager failed to return cached value for $key!")
       }
     } else {
-      /* This RDD is to be cached in memory. In this case we cannot pass the computed values
+      /*
+       * This RDD is to be cached in memory. In this case we cannot pass the computed values
        * to the BlockManager as an iterator and expect to read it back later. This is because
-       * we may end up dropping a partition from memory store before getting it back, e.g.
-       * when the entirety of the RDD does not fit in memory. */
-      val elements = new ArrayBuffer[Any]
-      elements ++= values
-      updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
-      elements.iterator.asInstanceOf[Iterator[T]]
+       * we may end up dropping a partition from memory store before getting it back.
+       *
+       * In addition, we must be careful to not unroll the entire partition in memory at once.
+       * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
+       * single partition. Instead, we unroll the values cautiously, potentially aborting and
+       * dropping the partition to disk if applicable.
+       */
+      blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
+        case Left(arr) =>
+          // We have successfully unrolled the entire partition, so cache it in memory
+          updatedBlocks ++=
+            blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
+          arr.iterator.asInstanceOf[Iterator[T]]
+        case Right(it) =>
+          // There is not enough space to cache this partition in memory
+          logWarning(s"Not enough space to cache partition $key in memory! " +
+            s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
+          val returnValues = it.asInstanceOf[Iterator[T]]
+          if (putLevel.useDisk) {
+            logWarning(s"Persisting partition $key to disk instead.")
+            val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
+              useOffHeap = false, deserialized = false, putLevel.replication)
+            putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
+          } else {
+            returnValues
+          }
+      }
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 8f70744d804d9bde09e11e2de311792ad2b3c8db..6ee731b22c03c4915937ce3724b5962483ca1988 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -67,7 +67,7 @@ class SparkEnv (
     val metricsSystem: MetricsSystem,
     val conf: SparkConf) extends Logging {
 
-  // A mapping of thread ID to amount of memory used for shuffle in bytes
+  // A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
   // All accesses should be manually synchronized
   val shuffleMemoryMap = mutable.HashMap[Long, Long]()
 
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index b16133b20cc029d28f6704fdc9dde24dccc3ec83..3b69bc4ca41426e2fb62b6368fad613501f56678 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -266,11 +266,13 @@ private[spark] class Executor(
           }
         }
       } finally {
-        // TODO: Unregister shuffle memory only for ResultTask
+        // Release memory used by this thread for shuffles
         val shuffleMemoryMap = env.shuffleMemoryMap
         shuffleMemoryMap.synchronized {
           shuffleMemoryMap.remove(Thread.currentThread().getId)
         }
+        // Release memory used by this thread for unrolling blocks
+        env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
         runningTasks.remove(taskId)
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0db0a5bc7341be2269dfbd61037d396e2bf7f94b..d746526639e5875af5c06db63b25ba02aa227294 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -38,7 +38,7 @@ import org.apache.spark.util._
 private[spark] sealed trait BlockValues
 private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
 private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
-private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
+private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
 
 /* Class for returning a fetched block and associated metrics. */
 private[spark] class BlockResult(
@@ -71,9 +71,9 @@ private[spark] class BlockManager(
 
   // Actual storage of where blocks are kept
   private var tachyonInitialized = false
-  private[storage] val memoryStore = new MemoryStore(this, maxMemory)
-  private[storage] val diskStore = new DiskStore(this, diskBlockManager)
-  private[storage] lazy val tachyonStore: TachyonStore = {
+  private[spark] val memoryStore = new MemoryStore(this, maxMemory)
+  private[spark] val diskStore = new DiskStore(this, diskBlockManager)
+  private[spark] lazy val tachyonStore: TachyonStore = {
     val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
     val appFolderName = conf.get("spark.tachyonStore.folderName")
     val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
@@ -463,16 +463,17 @@ private[spark] class BlockManager(
               val values = dataDeserialize(blockId, bytes)
               if (level.deserialized) {
                 // Cache the values before returning them
-                // TODO: Consider creating a putValues that also takes in a iterator?
-                val valuesBuffer = new ArrayBuffer[Any]
-                valuesBuffer ++= values
-                memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
-                  match {
-                    case Left(values2) =>
-                      return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
-                    case _ =>
-                      throw new SparkException("Memory store did not return back an iterator")
-                  }
+                val putResult = memoryStore.putIterator(
+                  blockId, values, level, returnValues = true, allowPersistToDisk = false)
+                // The put may or may not have succeeded, depending on whether there was enough
+                // space to unroll the block. Either way, the put here should return an iterator.
+                putResult.data match {
+                  case Left(it) =>
+                    return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
+                  case _ =>
+                    // This only happens if we dropped the values back to disk (which is never)
+                    throw new SparkException("Memory store did not return an iterator!")
+                }
               } else {
                 return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
               }
@@ -561,13 +562,14 @@ private[spark] class BlockManager(
     iter
   }
 
-  def put(
+  def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
-      tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
+      tellMaster: Boolean = true,
+      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
     require(values != null, "Values is null")
-    doPut(blockId, IteratorValues(values), level, tellMaster)
+    doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
   }
 
   /**
@@ -589,13 +591,14 @@ private[spark] class BlockManager(
    * Put a new block of values to the block manager.
    * Return a list of blocks updated as a result of this put.
    */
-  def put(
+  def putArray(
       blockId: BlockId,
-      values: ArrayBuffer[Any],
+      values: Array[Any],
       level: StorageLevel,
-      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+      tellMaster: Boolean = true,
+      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
     require(values != null, "Values is null")
-    doPut(blockId, ArrayBufferValues(values), level, tellMaster)
+    doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
   }
 
   /**
@@ -606,19 +609,33 @@ private[spark] class BlockManager(
       blockId: BlockId,
       bytes: ByteBuffer,
       level: StorageLevel,
-      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+      tellMaster: Boolean = true,
+      effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
     require(bytes != null, "Bytes is null")
-    doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
+    doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
   }
 
+  /**
+   * Put the given block according to the given level in one of the block stores, replicating
+   * the values if necessary.
+   *
+   * The effective storage level refers to the level according to which the block will actually be
+   * handled. This allows the caller to specify an alternate behavior of doPut while preserving
+   * the original level specified by the user.
+   */
   private def doPut(
       blockId: BlockId,
       data: BlockValues,
       level: StorageLevel,
-      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+      tellMaster: Boolean = true,
+      effectiveStorageLevel: Option[StorageLevel] = None)
+    : Seq[(BlockId, BlockStatus)] = {
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
+    effectiveStorageLevel.foreach { level =>
+      require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
+    }
 
     // Return value
     val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -657,13 +674,16 @@ private[spark] class BlockManager(
     // Size of the block in bytes
     var size = 0L
 
+    // The level we actually use to put the block
+    val putLevel = effectiveStorageLevel.getOrElse(level)
+
     // If we're storing bytes, then initiate the replication before storing them locally.
     // This is faster as data is already serialized and ready to send.
     val replicationFuture = data match {
-      case b: ByteBufferValues if level.replication > 1 =>
+      case b: ByteBufferValues if putLevel.replication > 1 =>
         // Duplicate doesn't copy the bytes, but just creates a wrapper
         val bufferView = b.buffer.duplicate()
-        Future { replicate(blockId, bufferView, level) }
+        Future { replicate(blockId, bufferView, putLevel) }
       case _ => null
     }
 
@@ -676,18 +696,18 @@ private[spark] class BlockManager(
         // returnValues - Whether to return the values put
         // blockStore - The type of storage to put these values into
         val (returnValues, blockStore: BlockStore) = {
-          if (level.useMemory) {
+          if (putLevel.useMemory) {
             // Put it in memory first, even if it also has useDisk set to true;
             // We will drop it to disk later if the memory store can't hold it.
             (true, memoryStore)
-          } else if (level.useOffHeap) {
+          } else if (putLevel.useOffHeap) {
             // Use tachyon for off-heap storage
             (false, tachyonStore)
-          } else if (level.useDisk) {
+          } else if (putLevel.useDisk) {
             // Don't get back the bytes from put unless we replicate them
-            (level.replication > 1, diskStore)
+            (putLevel.replication > 1, diskStore)
           } else {
-            assert(level == StorageLevel.NONE)
+            assert(putLevel == StorageLevel.NONE)
             throw new BlockException(
               blockId, s"Attempted to put block $blockId without specifying storage level!")
           }
@@ -696,22 +716,22 @@ private[spark] class BlockManager(
         // Actually put the values
         val result = data match {
           case IteratorValues(iterator) =>
-            blockStore.putValues(blockId, iterator, level, returnValues)
-          case ArrayBufferValues(array) =>
-            blockStore.putValues(blockId, array, level, returnValues)
+            blockStore.putIterator(blockId, iterator, putLevel, returnValues)
+          case ArrayValues(array) =>
+            blockStore.putArray(blockId, array, putLevel, returnValues)
           case ByteBufferValues(bytes) =>
             bytes.rewind()
-            blockStore.putBytes(blockId, bytes, level)
+            blockStore.putBytes(blockId, bytes, putLevel)
         }
         size = result.size
         result.data match {
-          case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator
+          case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
           case Right (newBytes) => bytesAfterPut = newBytes
           case _ =>
         }
 
         // Keep track of which blocks are dropped from memory
-        if (level.useMemory) {
+        if (putLevel.useMemory) {
           result.droppedBlocks.foreach { updatedBlocks += _ }
         }
 
@@ -742,7 +762,7 @@ private[spark] class BlockManager(
 
     // Either we're storing bytes and we asynchronously started replication, or we're storing
     // values and need to serialize and replicate them now:
-    if (level.replication > 1) {
+    if (putLevel.replication > 1) {
       data match {
         case ByteBufferValues(bytes) =>
           if (replicationFuture != null) {
@@ -758,7 +778,7 @@ private[spark] class BlockManager(
             }
             bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
           }
-          replicate(blockId, bytesAfterPut, level)
+          replicate(blockId, bytesAfterPut, putLevel)
           logDebug("Put block %s remotely took %s"
             .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
       }
@@ -766,7 +786,7 @@ private[spark] class BlockManager(
 
     BlockManager.dispose(bytesAfterPut)
 
-    if (level.replication > 1) {
+    if (putLevel.replication > 1) {
       logDebug("Putting block %s with replication took %s"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     } else {
@@ -818,7 +838,7 @@ private[spark] class BlockManager(
       value: Any,
       level: StorageLevel,
       tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
-    put(blockId, Iterator(value), level, tellMaster)
+    putIterator(blockId, Iterator(value), level, tellMaster)
   }
 
   /**
@@ -829,7 +849,7 @@ private[spark] class BlockManager(
    */
   def dropFromMemory(
       blockId: BlockId,
-      data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
+      data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
 
     logInfo(s"Dropping block $blockId from memory")
     val info = blockInfo.get(blockId).orNull
@@ -853,7 +873,7 @@ private[spark] class BlockManager(
           logInfo(s"Writing block $blockId to disk")
           data match {
             case Left(elements) =>
-              diskStore.putValues(blockId, elements, level, returnValues = false)
+              diskStore.putArray(blockId, elements, level, returnValues = false)
             case Right(bytes) =>
               diskStore.putBytes(blockId, bytes, level)
           }
@@ -1068,9 +1088,11 @@ private[spark] class BlockManager(
 private[spark] object BlockManager extends Logging {
   private val ID_GENERATOR = new IdGenerator
 
+  /** Return the total amount of storage memory available. */
   private def getMaxMemory(conf: SparkConf): Long = {
     val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
-    (Runtime.getRuntime.maxMemory * memoryFraction).toLong
+    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
+    (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
   }
 
   def getHeartBeatFrequency(conf: SparkConf): Long =
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index b9b53b1a2f1181d9e886ff0d0f8195d6eb343730..69985c9759e2d09844dfe61d533adc365468e01e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -37,15 +37,15 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
    * @return a PutResult that contains the size of the data, as well as the values put if
    *         returnValues is true (if not, the result's data field can be null)
    */
-  def putValues(
+  def putIterator(
     blockId: BlockId,
     values: Iterator[Any],
     level: StorageLevel,
     returnValues: Boolean): PutResult
 
-  def putValues(
+  def putArray(
     blockId: BlockId,
-    values: ArrayBuffer[Any],
+    values: Array[Any],
     level: StorageLevel,
     returnValues: Boolean): PutResult
 
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index ebff0cb5ba1532274312232a1c91b571e7d498c3..c83261dd91b36cb758ec183562b1f1ee4d6a2b59 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -21,8 +21,6 @@ import java.io.{FileOutputStream, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.Utils
@@ -30,7 +28,7 @@ import org.apache.spark.util.Utils
 /**
  * Stores BlockManager blocks on disk.
  */
-private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
+private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
   extends BlockStore(blockManager) with Logging {
 
   val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)
@@ -57,15 +55,15 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
     PutResult(bytes.limit(), Right(bytes.duplicate()))
   }
 
-  override def putValues(
+  override def putArray(
       blockId: BlockId,
-      values: ArrayBuffer[Any],
+      values: Array[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    putValues(blockId, values.toIterator, level, returnValues)
+    putIterator(blockId, values.toIterator, level, returnValues)
   }
 
-  override def putValues(
+  override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 71f66c826c5b3ec664d6130de1665278fa71cbfb..28f675c2bbb1e2b33f0a8701b04731b4e091facb 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -20,27 +20,45 @@ package org.apache.spark.storage
 import java.nio.ByteBuffer
 import java.util.LinkedHashMap
 
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.util.{SizeEstimator, Utils}
+import org.apache.spark.util.collection.SizeTrackingVector
 
 private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
 
 /**
- * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
+ * Stores blocks in memory, either as Arrays of deserialized Java objects or as
  * serialized ByteBuffers.
  */
-private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
+private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   extends BlockStore(blockManager) {
 
+  private val conf = blockManager.conf
   private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
+
   @volatile private var currentMemory = 0L
-  // Object used to ensure that only one thread is putting blocks and if necessary, dropping
-  // blocks from the memory store.
-  private val putLock = new Object()
+
+  // Ensure only one thread is putting, and if necessary, dropping blocks at any given time
+  private val accountingLock = new Object
+
+  // A mapping from thread ID to amount of memory used for unrolling a block (in bytes)
+  // All accesses of this map are assumed to have manually synchronized on `accountingLock`
+  private val unrollMemoryMap = mutable.HashMap[Long, Long]()
+
+  /**
+   * The amount of space ensured for unrolling values in memory, shared across all cores.
+   * This space is not reserved in advance, but allocated dynamically by dropping existing blocks.
+   */
+  private val maxUnrollMemory: Long = {
+    val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2)
+    (maxMemory * unrollFraction).toLong
+  }
 
   logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
 
+  /** Free memory not occupied by existing blocks. Note that this does not include unroll memory. */
   def freeMemory: Long = maxMemory - currentMemory
 
   override def getSize(blockId: BlockId): Long = {
@@ -55,20 +73,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     bytes.rewind()
     if (level.deserialized) {
       val values = blockManager.dataDeserialize(blockId, bytes)
-      val elements = new ArrayBuffer[Any]
-      elements ++= values
-      val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
-      val putAttempt = tryToPut(blockId, elements, sizeEstimate, deserialized = true)
-      PutResult(sizeEstimate, Left(values.toIterator), putAttempt.droppedBlocks)
+      putIterator(blockId, values, level, returnValues = true)
     } else {
       val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
       PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
     }
   }
 
-  override def putValues(
+  override def putArray(
       blockId: BlockId,
-      values: ArrayBuffer[Any],
+      values: Array[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
     if (level.deserialized) {
@@ -82,14 +96,52 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     }
   }
 
-  override def putValues(
+  override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    val valueEntries = new ArrayBuffer[Any]()
-    valueEntries ++= values
-    putValues(blockId, valueEntries, level, returnValues)
+    putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
+  }
+
+  /**
+   * Attempt to put the given block in memory store.
+   *
+   * There may not be enough space to fully unroll the iterator in memory, in which case we
+   * optionally drop the values to disk if
+   *   (1) the block's storage level specifies useDisk, and
+   *   (2) `allowPersistToDisk` is true.
+   *
+   * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
+   * back from disk and attempts to cache it in memory. In this case, we should not persist the
+   * block back on disk again, as it is already in disk store.
+   */
+  private[storage] def putIterator(
+      blockId: BlockId,
+      values: Iterator[Any],
+      level: StorageLevel,
+      returnValues: Boolean,
+      allowPersistToDisk: Boolean): PutResult = {
+    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+    val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
+    unrolledValues match {
+      case Left(arrayValues) =>
+        // Values are fully unrolled in memory, so store them as an array
+        val res = putArray(blockId, arrayValues, level, returnValues)
+        droppedBlocks ++= res.droppedBlocks
+        PutResult(res.size, res.data, droppedBlocks)
+      case Right(iteratorValues) =>
+        // Not enough space to unroll this block; drop to disk if applicable
+        logWarning(s"Not enough space to store block $blockId in memory! " +
+          s"Free memory is $freeMemory bytes.")
+        if (level.useDisk && allowPersistToDisk) {
+          logWarning(s"Persisting block $blockId to disk instead.")
+          val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
+          PutResult(res.size, res.data, droppedBlocks)
+        } else {
+          PutResult(0, Left(iteratorValues), droppedBlocks)
+        }
+    }
   }
 
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -99,7 +151,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     if (entry == null) {
       None
     } else if (entry.deserialized) {
-      Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
+      Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
     } else {
       Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
     }
@@ -112,7 +164,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     if (entry == null) {
       None
     } else if (entry.deserialized) {
-      Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+      Some(entry.value.asInstanceOf[Array[Any]].iterator)
     } else {
       val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
       Some(blockManager.dataDeserialize(blockId, buffer))
@@ -140,6 +192,93 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     logInfo("MemoryStore cleared")
   }
 
+  /**
+   * Unroll the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM exceptions caused by
+   * unrolling the entirety of the block in memory at once. This is achieved by periodically
+   * checking whether the memory restrictions for unrolling blocks are still satisfied,
+   * stopping immediately if not. This check is a safeguard against the scenario in which
+   * there is not enough free memory to accommodate the entirety of a single block.
+   *
+   * This method returns either an array with the contents of the entire block or an iterator
+   * containing the values of the block (if the array would have exceeded available memory).
+   */
+  def unrollSafely(
+      blockId: BlockId,
+      values: Iterator[Any],
+      droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+    : Either[Array[Any], Iterator[Any]] = {
+
+    // Number of elements unrolled so far
+    var elementsUnrolled = 0
+    // Whether there is still enough memory for us to continue unrolling this block
+    var keepUnrolling = true
+    // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing.
+    val initialMemoryThreshold = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
+    // How often to check whether we need to request more memory
+    val memoryCheckPeriod = 16
+    // Memory currently reserved by this thread for this particular unrolling operation
+    var memoryThreshold = initialMemoryThreshold
+    // Memory to request as a multiple of current vector size
+    val memoryGrowthFactor = 1.5
+    // Previous unroll memory held by this thread, for releasing later (only at the very end)
+    val previousMemoryReserved = currentUnrollMemoryForThisThread
+    // Underlying vector for unrolling the block
+    var vector = new SizeTrackingVector[Any]
+
+    // Request enough memory to begin unrolling
+    keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold)
+
+    // Unroll this block safely, checking whether we have exceeded our threshold periodically
+    try {
+      while (values.hasNext && keepUnrolling) {
+        vector += values.next()
+        if (elementsUnrolled % memoryCheckPeriod == 0) {
+          // If our vector's size has exceeded the threshold, request more memory
+          val currentSize = vector.estimateSize()
+          if (currentSize >= memoryThreshold) {
+            val amountToRequest = (currentSize * (memoryGrowthFactor - 1)).toLong
+            // Hold the accounting lock, in case another thread concurrently puts a block that
+            // takes up the unrolling space we just ensured here
+            accountingLock.synchronized {
+              if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
+                // If the first request is not granted, try again after ensuring free space
+                // If there is still not enough space, give up and drop the partition
+                val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
+                if (spaceToEnsure > 0) {
+                  val result = ensureFreeSpace(blockId, spaceToEnsure)
+                  droppedBlocks ++= result.droppedBlocks
+                }
+                keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
+              }
+            }
+            // New threshold is currentSize * memoryGrowthFactor
+            memoryThreshold = currentSize + amountToRequest
+          }
+        }
+        elementsUnrolled += 1
+      }
+
+      if (keepUnrolling) {
+        // We successfully unrolled the entirety of this block
+        Left(vector.toArray)
+      } else {
+        // We ran out of space while unrolling the values for this block
+        Right(vector.iterator ++ values)
+      }
+
+    } finally {
+      // If we return an array, the values returned do not depend on the underlying vector and
+      // we can immediately free up space for other threads. Otherwise, if we return an iterator,
+      // we release the memory claimed by this thread later on when the task finishes.
+      if (keepUnrolling) {
+        val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
+        releaseUnrollMemoryForThisThread(amountToRelease)
+      }
+    }
+  }
+
   /**
    * Return the RDD ID that a given block ID is from, or None if it is not an RDD block.
    */
@@ -149,10 +288,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
 
   /**
    * Try to put in a set of values, if we can free up enough space. The value should either be
-   * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
-   * size must also be passed by the caller.
+   * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
+   * must also be passed by the caller.
    *
-   * Lock on the object putLock to ensure that all the put requests and its associated block
+   * Synchronize on `accountingLock` to ensure that all the put requests and its associated block
    * dropping is done by only on thread at a time. Otherwise while one thread is dropping
    * blocks to free memory for one block, another thread may use up the freed space for
    * another block.
@@ -174,7 +313,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     var putSuccess = false
     val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
 
-    putLock.synchronized {
+    accountingLock.synchronized {
       val freeSpaceResult = ensureFreeSpace(blockId, size)
       val enoughFreeSpace = freeSpaceResult.success
       droppedBlocks ++= freeSpaceResult.droppedBlocks
@@ -193,7 +332,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         // Tell the block manager that we couldn't put it in memory so that it can drop it to
         // disk if the block allows disk storage.
         val data = if (deserialized) {
-          Left(value.asInstanceOf[ArrayBuffer[Any]])
+          Left(value.asInstanceOf[Array[Any]])
         } else {
           Right(value.asInstanceOf[ByteBuffer].duplicate())
         }
@@ -210,12 +349,14 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
    * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
    * don't fit into memory that we want to avoid).
    *
-   * Assume that a lock is held by the caller to ensure only one thread is dropping blocks.
-   * Otherwise, the freed space may fill up before the caller puts in their new value.
+   * Assume that `accountingLock` is held by the caller to ensure only one thread is dropping
+   * blocks. Otherwise, the freed space may fill up before the caller puts in their new value.
    *
    * Return whether there is enough free space, along with the blocks dropped in the process.
    */
-  private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = {
+  private def ensureFreeSpace(
+      blockIdToAdd: BlockId,
+      space: Long): ResultWithDroppedBlocks = {
     logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")
 
     val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -225,9 +366,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       return ResultWithDroppedBlocks(success = false, droppedBlocks)
     }
 
-    if (maxMemory - currentMemory < space) {
+    // Take into account the amount of memory currently occupied by unrolling blocks
+    val actualFreeMemory = freeMemory - currentUnrollMemory
+
+    if (actualFreeMemory < space) {
       val rddToAdd = getRddId(blockIdToAdd)
-      val selectedBlocks = new ArrayBuffer[BlockId]()
+      val selectedBlocks = new ArrayBuffer[BlockId]
       var selectedMemory = 0L
 
       // This is synchronized to ensure that the set of entries is not changed
@@ -235,7 +379,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       // can lead to exceptions.
       entries.synchronized {
         val iterator = entries.entrySet().iterator()
-        while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+        while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
           val pair = iterator.next()
           val blockId = pair.getKey
           if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
@@ -245,7 +389,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         }
       }
 
-      if (maxMemory - (currentMemory - selectedMemory) >= space) {
+      if (actualFreeMemory + selectedMemory >= space) {
         logInfo(s"${selectedBlocks.size} blocks selected for dropping")
         for (blockId <- selectedBlocks) {
           val entry = entries.synchronized { entries.get(blockId) }
@@ -254,7 +398,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
           // future safety.
           if (entry != null) {
             val data = if (entry.deserialized) {
-              Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
+              Left(entry.value.asInstanceOf[Array[Any]])
             } else {
               Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
             }
@@ -275,8 +419,56 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   override def contains(blockId: BlockId): Boolean = {
     entries.synchronized { entries.containsKey(blockId) }
   }
+
+  /**
+   * Reserve additional memory for unrolling blocks used by this thread.
+   * Return whether the request is granted.
+   */
+  private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
+    accountingLock.synchronized {
+      val granted = freeMemory > currentUnrollMemory + memory
+      if (granted) {
+        val threadId = Thread.currentThread().getId
+        unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory
+      }
+      granted
+    }
+  }
+
+  /**
+   * Release memory used by this thread for unrolling blocks.
+   * If the amount is not specified, remove the current thread's allocation altogether.
+   */
+  private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
+    val threadId = Thread.currentThread().getId
+    accountingLock.synchronized {
+      if (memory < 0) {
+        unrollMemoryMap.remove(threadId)
+      } else {
+        unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory
+        // If this thread claims no more unroll memory, release it completely
+        if (unrollMemoryMap(threadId) <= 0) {
+          unrollMemoryMap.remove(threadId)
+        }
+      }
+    }
+  }
+
+  /**
+   * Return the amount of memory currently occupied for unrolling blocks across all threads.
+   */
+  private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
+    unrollMemoryMap.values.sum
+  }
+
+  /**
+   * Return the amount of memory currently occupied for unrolling blocks by this thread.
+   */
+  private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
+    unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
+  }
 }
 
-private case class ResultWithDroppedBlocks(
+private[spark] case class ResultWithDroppedBlocks(
     success: Boolean,
     droppedBlocks: Seq[(BlockId, BlockStatus)])
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index d8ff4ff6bd42c7e0a02f777285b2a21569018eca..932b5616043b447435e3ab43109e3f320f36ee3a 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -20,8 +20,6 @@ package org.apache.spark.storage
 import java.io.IOException
 import java.nio.ByteBuffer
 
-import scala.collection.mutable.ArrayBuffer
-
 import tachyon.client.{ReadType, WriteType}
 
 import org.apache.spark.Logging
@@ -30,7 +28,7 @@ import org.apache.spark.util.Utils
 /**
  * Stores BlockManager blocks on Tachyon.
  */
-private class TachyonStore(
+private[spark] class TachyonStore(
     blockManager: BlockManager,
     tachyonManager: TachyonBlockManager)
   extends BlockStore(blockManager: BlockManager) with Logging {
@@ -45,15 +43,15 @@ private class TachyonStore(
     putIntoTachyonStore(blockId, bytes, returnValues = true)
   }
 
-  override def putValues(
+  override def putArray(
       blockId: BlockId,
-      values: ArrayBuffer[Any],
+      values: Array[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    putValues(blockId, values.toIterator, level, returnValues)
+    putIterator(blockId, values.toIterator, level, returnValues)
   }
 
-  override def putValues(
+  override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 328be158db680fc9fe2ccfca426724ee66f75d0c..75c2e09a6bbb84d0de3de7dfa97fb26e9b464181 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -48,7 +48,7 @@ private[spark] object ThreadingTest {
         val block = (1 to blockSize).map(_ => Random.nextInt())
         val level = randomLevel()
         val startTime = System.currentTimeMillis()
-        manager.put(blockId, block.iterator, level, tellMaster = true)
+        manager.putIterator(blockId, block.iterator, level, tellMaster = true)
         println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
         queue.add((blockId, block))
       }
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 08465575309c6b0e6df6a2c91a00c334c073bcd5..bce3b3afe9abad4d4c52f05f2547027e62a86251 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -180,7 +180,7 @@ private[spark] object SizeEstimator extends Logging {
     }
   }
 
-  // Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
+  // Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
   private val ARRAY_SIZE_FOR_SAMPLING = 200
   private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
 
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
index b84eb65c62bc7732fc410fed483b34a237c963b8..7e76d060d6000da71ceaa0cc78cce8fcb6945567 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@ -36,7 +36,7 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:
     _array(index)
   }
 
-  def +=(value: V) {
+  def +=(value: V): Unit = {
     if (_numElements == _array.length) {
       resize(_array.length * 2)
     }
@@ -50,6 +50,19 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:
 
   def size: Int = _numElements
 
+  def iterator: Iterator[V] = new Iterator[V] {
+    var index = 0
+    override def hasNext: Boolean = index < _numElements
+    override def next(): V = {
+      if (!hasNext) {
+        throw new NoSuchElementException
+      }
+      val value = _array(index)
+      index += 1
+      value
+    }
+  }
+
   /** Gets the underlying array backing this vector. */
   def array: Array[V] = _array
 
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala
new file mode 100644
index 0000000000000000000000000000000000000000..3eb1010dc1e8d931e2b6d55ad1c734c6a266129a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+import org.apache.spark.util.SizeEstimator
+
+/**
+ * A general interface for collections to keep track of their estimated sizes in bytes.
+ * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
+ * as each call to SizeEstimator is somewhat expensive (order of a few milliseconds).
+ */
+private[spark] trait SizeTracker {
+
+  import SizeTracker._
+
+  /**
+   * Controls the base of the exponential which governs the rate of sampling.
+   * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
+   */
+  private val SAMPLE_GROWTH_RATE = 1.1
+
+  /** Samples taken since last resetSamples(). Only the last two are kept for extrapolation. */
+  private val samples = new mutable.Queue[Sample]
+
+  /** The average number of bytes per update between our last two samples. */
+  private var bytesPerUpdate: Double = _
+
+  /** Total number of insertions and updates into the map since the last resetSamples(). */
+  private var numUpdates: Long = _
+
+  /** The value of 'numUpdates' at which we will take our next sample. */
+  private var nextSampleNum: Long = _
+
+  resetSamples()
+
+  /**
+   * Reset samples collected so far.
+   * This should be called after the collection undergoes a dramatic change in size.
+   */
+  protected def resetSamples(): Unit = {
+    numUpdates = 1
+    nextSampleNum = 1
+    samples.clear()
+    takeSample()
+  }
+
+  /**
+   * Callback to be invoked after every update.
+   */
+  protected def afterUpdate(): Unit = {
+    numUpdates += 1
+    if (nextSampleNum == numUpdates) {
+      takeSample()
+    }
+  }
+
+  /**
+   * Take a new sample of the current collection's size.
+   */
+  private def takeSample(): Unit = {
+    samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))
+    // Only use the last two samples to extrapolate
+    if (samples.size > 2) {
+      samples.dequeue()
+    }
+    val bytesDelta = samples.toList.reverse match {
+      case latest :: previous :: tail =>
+        (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
+      // If fewer than 2 samples, assume no change
+      case _ => 0
+    }
+    bytesPerUpdate = math.max(0, bytesDelta)
+    nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
+  }
+
+  /**
+   * Estimate the current size of the collection in bytes. O(1) time.
+   */
+  def estimateSize(): Long = {
+    assert(samples.nonEmpty)
+    val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
+    (samples.last.size + extrapolatedDelta).toLong
+  }
+}
+
+private object SizeTracker {
+  case class Sample(size: Long, numUpdates: Long)
+}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
index 204330dad48b929670d83a13d4ddda5051c6f114..de61e1d17fe10200ab4c1bc1a44b1008ce3ed807 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
@@ -17,85 +17,24 @@
 
 package org.apache.spark.util.collection
 
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.util.SizeEstimator
-import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample
-
 /**
- * Append-only map that keeps track of its estimated size in bytes.
- * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
- * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
+ * An append-only map that keeps track of its estimated size in bytes.
  */
-private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
-
-  /**
-   * Controls the base of the exponential which governs the rate of sampling.
-   * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
-   */
-  private val SAMPLE_GROWTH_RATE = 1.1
-
-  /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
-  private val samples = new ArrayBuffer[Sample]()
-
-  /** Total number of insertions and updates into the map since the last resetSamples(). */
-  private var numUpdates: Long = _
-
-  /** The value of 'numUpdates' at which we will take our next sample. */
-  private var nextSampleNum: Long = _
-
-  /** The average number of bytes per update between our last two samples. */
-  private var bytesPerUpdate: Double = _
-
-  resetSamples()
-
-  /** Called after the map grows in size, as this can be a dramatic change for small objects. */
-  def resetSamples() {
-    numUpdates = 1
-    nextSampleNum = 1
-    samples.clear()
-    takeSample()
-  }
+private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] with SizeTracker {
 
   override def update(key: K, value: V): Unit = {
     super.update(key, value)
-    numUpdates += 1
-    if (nextSampleNum == numUpdates) { takeSample() }
+    super.afterUpdate()
   }
 
   override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
     val newValue = super.changeValue(key, updateFunc)
-    numUpdates += 1
-    if (nextSampleNum == numUpdates) { takeSample() }
+    super.afterUpdate()
     newValue
   }
 
-  /** Takes a new sample of the current map's size. */
-  def takeSample() {
-    samples += Sample(SizeEstimator.estimate(this), numUpdates)
-    // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
-    bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
-      case latest :: previous :: tail =>
-        (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
-      case _ =>
-        0
-    })
-    nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
-  }
-
-  override protected def growTable() {
+  override protected def growTable(): Unit = {
     super.growTable()
     resetSamples()
   }
-
-  /** Estimates the current size of the map in bytes. O(1) time. */
-  def estimateSize(): Long = {
-    assert(samples.nonEmpty)
-    val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
-    (samples.last.size + extrapolatedDelta).toLong
-  }
-}
-
-private object SizeTrackingAppendOnlyMap {
-  case class Sample(size: Long, numUpdates: Long)
 }
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala
new file mode 100644
index 0000000000000000000000000000000000000000..65a7b4e0d497b5cf8f13dc07e3577f66bad90e8f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+
+/**
+ * An append-only buffer that keeps track of its estimated size in bytes.
+ */
+private[spark] class SizeTrackingVector[T: ClassTag]
+  extends PrimitiveVector[T]
+  with SizeTracker {
+
+  override def +=(value: T): Unit = {
+    super.+=(value)
+    super.afterUpdate()
+  }
+
+  override def resize(newLength: Int): PrimitiveVector[T] = {
+    super.resize(newLength)
+    resetSamples()
+    this
+  }
+
+  /**
+   * Return a trimmed version of the underlying array.
+   */
+  def toArray: Array[T] = {
+    super.iterator.toArray
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 7f5d0b061e8b0bbc7ab43cebf96b31c36db4d218..9c5f394d3899d56206f58f261405abbaed042063 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.mock.EasyMockSugar
 
@@ -52,22 +50,21 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
   }
 
   test("get uncached rdd") {
-    expecting {
-      blockManager.get(RDDBlockId(0, 0)).andReturn(None)
-      blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY,
-        true).andStubReturn(Seq[(BlockId, BlockStatus)]())
-    }
-
-    whenExecuting(blockManager) {
-      val context = new TaskContext(0, 0, 0)
-      val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
-      assert(value.toList === List(1, 2, 3, 4))
-    }
+    // Do not mock this test, because attempting to match Array[Any], which is not covariant,
+    // in blockManager.put is a losing battle. You have been warned.
+    blockManager = sc.env.blockManager
+    cacheManager = sc.env.cacheManager
+    val context = new TaskContext(0, 0, 0)
+    val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+    val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
+    assert(computeValue.toList === List(1, 2, 3, 4))
+    assert(getValue.isDefined, "Block cached from getOrCompute is not found!")
+    assert(getValue.get.data.toList === List(1, 2, 3, 4))
   }
 
   test("get cached rdd") {
     expecting {
-      val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+      val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
       blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
     }
 
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 23cb6905bfdeb6a7ef576b8ab27c92f72198a482..dd4fd535d3577e32137eadf325053ac50dd676b6 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Timeouts._
 import org.scalatest.Matchers
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
@@ -43,6 +43,7 @@ import scala.language.postfixOps
 
 class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   with PrivateMethodTester {
+
   private val conf = new SparkConf(false)
   var store: BlockManager = null
   var store2: BlockManager = null
@@ -61,21 +62,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
   def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
 
+  private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+    new BlockManager(
+      name, actorSystem, master, serializer, maxMem, conf, securityMgr, mapOutputTracker)
+  }
+
   before {
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf,
-      securityManager = securityMgr)
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+      "test", "localhost", 0, conf = conf, securityManager = securityMgr)
     this.actorSystem = actorSystem
-    conf.set("spark.driver.port", boundPort.toString)
-
-    master = new BlockManagerMaster(
-      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
-      conf)
 
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
     oldArch = System.setProperty("os.arch", "amd64")
     conf.set("os.arch", "amd64")
     conf.set("spark.test.useCompressedOops", "true")
     conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
+    conf.set("spark.driver.port", boundPort.toString)
+    conf.set("spark.storage.unrollFraction", "0.4")
+    conf.set("spark.storage.unrollMemoryThreshold", "512")
+
+    master = new BlockManagerMaster(
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+      conf)
+
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
   }
@@ -138,11 +147,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("master + 1 manager interaction") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(20000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
 
     // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -169,10 +177,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("master + 2 managers interaction") {
-    store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
-    store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000, "exec1")
+    store2 = makeBlockManager(2000, "exec2")
 
     val peers = master.getPeers(store.blockManagerId, 1)
     assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -187,11 +193,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("removing block") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(20000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
 
     // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
     store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
@@ -200,8 +205,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
-    assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
-    assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+    assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000")
+    assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000")
     assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
     assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
     assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
@@ -230,17 +235,16 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     }
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       val memStatus = master.getMemoryStatus.head._2
-      memStatus._1 should equal (2000L)
-      memStatus._2 should equal (2000L)
+      memStatus._1 should equal (20000L)
+      memStatus._2 should equal (20000L)
     }
   }
 
   test("removing rdd") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(20000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     // Putting a1, a2 and a3 in memory.
     store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
@@ -270,11 +274,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("removing broadcast") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000)
     val driverStore = store
-    val executorStore = new BlockManager("executor", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
+    val executorStore = makeBlockManager(2000, "executor")
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -343,8 +345,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
 
   test("reregistration on heart beat") {
     val heartBeat = PrivateMethod[Unit]('heartBeat)
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
 
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -380,8 +381,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
 
   test("reregistration doesn't dead lock") {
     val heartBeat = PrivateMethod[Unit]('heartBeat)
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(2000)
     val a1 = new Array[Byte](400)
     val a2 = List(new Array[Byte](400))
 
@@ -390,7 +390,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
       master.removeExecutor(store.blockManagerId.executorId)
       val t1 = new Thread {
         override def run() {
-          store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+          store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
         }
       }
       val t2 = new Thread {
@@ -418,19 +418,14 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("correct BlockResult returned from get() calls") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr,
-      mapOutputTracker)
-    val list1 = List(new Array[Byte](200), new Array[Byte](200))
-    val list1ForSizeEstimate = new ArrayBuffer[Any]
-    list1ForSizeEstimate ++= list1.iterator
-    val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
-    val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150))
-    val list2ForSizeEstimate = new ArrayBuffer[Any]
-    list2ForSizeEstimate ++= list2.iterator
-    val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    store = makeBlockManager(12000)
+    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
+    val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
+    val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val list1Get = store.get("list1")
     assert(list1Get.isDefined, "list1 expected to be in store")
     assert(list1Get.get.data.size === 2)
@@ -451,11 +446,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("in-memory LRU storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
     store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
@@ -471,11 +465,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("in-memory LRU storage with serialization") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
@@ -491,11 +484,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("in-memory LRU for partitions of same RDD") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
     store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
@@ -511,11 +503,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store = makeBlockManager(12000)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // At this point rdd_1_1 should've replaced rdd_0_1
     assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
     assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@@ -523,8 +514,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     // Do a get() on rdd_0_2 so that it is the most recently used item
     assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
     // Put in more partitions from RDD 0; they should replace rdd_1_1
-    store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
     // when we try to add rdd_0_4.
     assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -538,8 +529,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     // TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar.
     val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
     if (tachyonUnitTestEnabled) {
-      store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-        securityMgr, mapOutputTracker)
+      store = makeBlockManager(1200)
       val a1 = new Array[Byte](400)
       val a2 = new Array[Byte](400)
       val a3 = new Array[Byte](400)
@@ -555,8 +545,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("on-disk storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
+    store = makeBlockManager(1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -569,11 +558,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("disk and memory storage") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
@@ -585,11 +573,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("disk and memory storage with getLocalBytes") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
@@ -601,11 +588,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("disk and memory storage with serialization") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
@@ -617,11 +603,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("disk and memory storage with serialization and getLocalBytes") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
     store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
@@ -633,12 +618,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("LRU with mixed storage levels") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
-    val a3 = new Array[Byte](400)
-    val a4 = new Array[Byte](400)
+    store = makeBlockManager(12000)
+    val a1 = new Array[Byte](4000)
+    val a2 = new Array[Byte](4000)
+    val a3 = new Array[Byte](4000)
+    val a4 = new Array[Byte](4000)
     // First store a1 and a2, both in memory, and a3, on disk only
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
@@ -656,14 +640,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("in-memory LRU with streams") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val list1 = List(new Array[Byte](200), new Array[Byte](200))
-    val list2 = List(new Array[Byte](200), new Array[Byte](200))
-    val list3 = List(new Array[Byte](200), new Array[Byte](200))
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store = makeBlockManager(12000)
+    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.get("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     assert(store.get("list3").isDefined, "list3 was not in store")
@@ -672,7 +655,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(store.get("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.get("list1").isDefined, "list1 was not in store")
     assert(store.get("list1").get.data.size === 2)
     assert(store.get("list2").isDefined, "list2 was not in store")
@@ -681,16 +664,15 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("LRU with mixed storage levels and streams") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val list1 = List(new Array[Byte](200), new Array[Byte](200))
-    val list2 = List(new Array[Byte](200), new Array[Byte](200))
-    val list3 = List(new Array[Byte](200), new Array[Byte](200))
-    val list4 = List(new Array[Byte](200), new Array[Byte](200))
+    store = makeBlockManager(12000)
+    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
+    val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
-    store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     val listForSizeEstimate = new ArrayBuffer[Any]
     listForSizeEstimate ++= list1.iterator
     val listSize = SizeEstimator.estimate(listForSizeEstimate)
@@ -708,7 +690,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(store.get("list3").isDefined, "list3 was not in store")
     assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
-    store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
+    store.putIterator("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
     assert(store.get("list1") === None, "list1 was in store")
     assert(store.get("list2").isDefined, "list2 was not in store")
     assert(store.get("list2").get.data.size === 2)
@@ -731,11 +713,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("overly large block") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 500, conf,
-      securityMgr, mapOutputTracker)
-    store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+    store = makeBlockManager(5000)
+    store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingle("a1") === None, "a1 was in store")
-    store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
+    store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
     assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
     assert(store.getSingle("a2").isDefined, "a2 was not in store")
   }
@@ -743,8 +724,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   test("block compression") {
     try {
       conf.set("spark.shuffle.compress", "true")
-      store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
+      store = makeBlockManager(20000, "exec1")
       store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
         "shuffle_0_0_0 was not compressed")
@@ -752,52 +732,46 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
       store = null
 
       conf.set("spark.shuffle.compress", "false")
-      store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000,
+      store = makeBlockManager(20000, "exec2")
+      store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
         "shuffle_0_0_0 was compressed")
       store.stop()
       store = null
 
       conf.set("spark.broadcast.compress", "true")
-      store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100,
+      store = makeBlockManager(20000, "exec3")
+      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
         "broadcast_0 was not compressed")
       store.stop()
       store = null
 
       conf.set("spark.broadcast.compress", "false")
-      store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed")
+      store = makeBlockManager(20000, "exec4")
+      store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "true")
-      store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed")
+      store = makeBlockManager(20000, "exec5")
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
       store.stop()
       store = null
 
       conf.set("spark.rdd.compress", "false")
-      store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
-      assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed")
+      store = makeBlockManager(20000, "exec6")
+      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
-      store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf,
-        securityMgr, mapOutputTracker)
-      store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
-      assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
+      store = makeBlockManager(20000, "exec7")
+      store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+      assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
       store.stop()
       store = null
     } finally {
@@ -871,30 +845,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(Arrays.equals(mappedAsArray, bytes))
     assert(Arrays.equals(notMappedAsArray, bytes))
   }
-  
+
   test("updated block statuses") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val list = List.fill(2)(new Array[Byte](200))
-    val bigList = List.fill(8)(new Array[Byte](200))
+    store = makeBlockManager(12000)
+    val list = List.fill(2)(new Array[Byte](2000))
+    val bigList = List.fill(8)(new Array[Byte](2000))
 
     // 1 updated block (i.e. list1)
     val updatedBlocks1 =
-      store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(updatedBlocks1.size === 1)
     assert(updatedBlocks1.head._1 === TestBlockId("list1"))
     assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
 
     // 1 updated block (i.e. list2)
     val updatedBlocks2 =
-      store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+      store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
     assert(updatedBlocks2.size === 1)
     assert(updatedBlocks2.head._1 === TestBlockId("list2"))
     assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
 
     // 2 updated blocks - list1 is kicked out of memory while list3 is added
     val updatedBlocks3 =
-      store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(updatedBlocks3.size === 2)
     updatedBlocks3.foreach { case (id, status) =>
       id match {
@@ -903,11 +876,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
         case _ => fail("Updated block is neither list1 nor list3")
       }
     }
-    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
 
     // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
     val updatedBlocks4 =
-      store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(updatedBlocks4.size === 2)
     updatedBlocks4.foreach { case (id, status) =>
       id match {
@@ -916,26 +889,37 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
         case _ => fail("Updated block is neither list2 nor list4")
       }
     }
-    assert(store.get("list4").isDefined, "list4 was not in store")
+    assert(store.diskStore.contains("list2"), "list2 was not in disk store")
+    assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
 
-    // No updated blocks - nothing is kicked out of memory because list5 is too big to be added
+    // No updated blocks - list5 is too big to fit in store and nothing is kicked out
     val updatedBlocks5 =
-      store.put("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(updatedBlocks5.size === 0)
-    assert(store.get("list2").isDefined, "list2 was not in store")
-    assert(store.get("list4").isDefined, "list4 was not in store")
-    assert(!store.get("list5").isDefined, "list5 was in store")
+
+    // memory store contains only list3 and list4
+    assert(!store.memoryStore.contains("list1"), "list1 was in memory store")
+    assert(!store.memoryStore.contains("list2"), "list2 was in memory store")
+    assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
+    assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
+    assert(!store.memoryStore.contains("list5"), "list5 was in memory store")
+
+    // disk store contains only list2
+    assert(!store.diskStore.contains("list1"), "list1 was in disk store")
+    assert(store.diskStore.contains("list2"), "list2 was not in disk store")
+    assert(!store.diskStore.contains("list3"), "list3 was in disk store")
+    assert(!store.diskStore.contains("list4"), "list4 was in disk store")
+    assert(!store.diskStore.contains("list5"), "list5 was in disk store")
   }
 
   test("query block statuses") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val list = List.fill(2)(new Array[Byte](200))
+    store = makeBlockManager(12000)
+    val list = List.fill(2)(new Array[Byte](2000))
 
     // Tell master. By LRU, only list2 and list3 remains.
-    store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getLocations("list1").size === 0)
@@ -949,9 +933,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
 
     // This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
-    store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
-    store.put("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.put("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+    store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // getLocations should return nothing because the master is not informed
     // getBlockStatus without asking slaves should have the same result
@@ -968,23 +952,22 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("get matching blocks") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    val list = List.fill(2)(new Array[Byte](10))
+    store = makeBlockManager(12000)
+    val list = List.fill(2)(new Array[Byte](100))
 
     // insert some blocks
-    store.put("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.put("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size === 3)
     assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1)
 
     // insert some more blocks
-    store.put("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
-    store.put("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-    store.put("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+    store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+    store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
 
     // getLocations and getBlockStatus should yield the same locations
     assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1)
@@ -992,7 +975,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
     blockIds.foreach { blockId =>
-      store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+      store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     }
     val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
       case RDDBlockId(1, _) => true
@@ -1002,17 +985,240 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
   }
 
   test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
-    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
-      securityMgr, mapOutputTracker)
-    store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
-    store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store = makeBlockManager(12000)
+    store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // Access rdd_1_0 to ensure it's not least recently used.
     assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
     // According to the same-RDD rule, rdd_1_0 should be replaced here.
-    store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
     // rdd_1_0 should have been replaced, even it's not least recently used.
     assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
     assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
     assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
   }
+
+  test("reserve/release unroll memory") {
+    store = makeBlockManager(12000)
+    val memoryStore = store.memoryStore
+    assert(memoryStore.currentUnrollMemory === 0)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Reserve
+    memoryStore.reserveUnrollMemoryForThisThread(100)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 100)
+    memoryStore.reserveUnrollMemoryForThisThread(200)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 300)
+    memoryStore.reserveUnrollMemoryForThisThread(500)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 800)
+    memoryStore.reserveUnrollMemoryForThisThread(1000000)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 800) // not granted
+    // Release
+    memoryStore.releaseUnrollMemoryForThisThread(100)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 700)
+    memoryStore.releaseUnrollMemoryForThisThread(100)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 600)
+    // Reserve again
+    memoryStore.reserveUnrollMemoryForThisThread(4400)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 5000)
+    memoryStore.reserveUnrollMemoryForThisThread(20000)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 5000) // not granted
+    // Release again
+    memoryStore.releaseUnrollMemoryForThisThread(1000)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 4000)
+    memoryStore.releaseUnrollMemoryForThisThread() // release all
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+  }
+
+  /**
+   * Verify the result of MemoryStore#unrollSafely is as expected.
+   */
+  private def verifyUnroll(
+      expected: Iterator[Any],
+      result: Either[Array[Any], Iterator[Any]],
+      shouldBeArray: Boolean): Unit = {
+    val actual: Iterator[Any] = result match {
+      case Left(arr: Array[Any]) =>
+        assert(shouldBeArray, "expected iterator from unroll!")
+        arr.iterator
+      case Right(it: Iterator[Any]) =>
+        assert(!shouldBeArray, "expected array from unroll!")
+        it
+      case _ =>
+        fail("unroll returned neither an iterator nor an array...")
+    }
+    expected.zip(actual).foreach { case (e, a) =>
+      assert(e === a, "unroll did not return original values!")
+    }
+  }
+
+  test("safely unroll blocks") {
+    store = makeBlockManager(12000)
+    val smallList = List.fill(40)(new Array[Byte](100))
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    val memoryStore = store.memoryStore
+    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll with all the space in the world. This should succeed and return an array.
+    var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+    verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll with not enough space. This should succeed after kicking out someBlock1.
+    store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+    verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(droppedBlocks.size === 1)
+    assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
+    droppedBlocks.clear()
+
+    // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
+    // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
+    // In the mean time, however, we kicked out someBlock2 before giving up.
+    store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+    unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks)
+    verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
+    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+    assert(droppedBlocks.size === 1)
+    assert(droppedBlocks.head._1 === TestBlockId("someBlock2"))
+    droppedBlocks.clear()
+  }
+
+  test("safely unroll blocks through putIterator") {
+    store = makeBlockManager(12000)
+    val memOnly = StorageLevel.MEMORY_ONLY
+    val memoryStore = store.memoryStore
+    val smallList = List.fill(40)(new Array[Byte](100))
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+    def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll with plenty of space. This should succeed and cache both blocks.
+    val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+    val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+    assert(memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(result1.size > 0) // unroll was successful
+    assert(result2.size > 0)
+    assert(result1.data.isLeft) // unroll did not drop this block to disk
+    assert(result2.data.isLeft)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
+    // would not know how to drop them from memory later.
+    memoryStore.remove("b1")
+    memoryStore.remove("b2")
+    store.putIterator("b1", smallIterator, memOnly)
+    store.putIterator("b2", smallIterator, memOnly)
+
+    // Unroll with not enough space. This should succeed but kick out b1 in the process.
+    val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+    assert(result3.size > 0)
+    assert(result3.data.isLeft)
+    assert(!memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    memoryStore.remove("b3")
+    store.putIterator("b3", smallIterator, memOnly)
+
+    // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+    val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
+    assert(result4.size === 0) // unroll was unsuccessful
+    assert(result4.data.isLeft)
+    assert(!memoryStore.contains("b1"))
+    assert(!memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(!memoryStore.contains("b4"))
+    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+  }
+
+  /**
+   * This test is essentially identical to the preceding one, except that it uses MEMORY_AND_DISK.
+   */
+  test("safely unroll blocks through putIterator (disk)") {
+    store = makeBlockManager(12000)
+    val memAndDisk = StorageLevel.MEMORY_AND_DISK
+    val memoryStore = store.memoryStore
+    val diskStore = store.diskStore
+    val smallList = List.fill(40)(new Array[Byte](100))
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+    def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    store.putIterator("b1", smallIterator, memAndDisk)
+    store.putIterator("b2", smallIterator, memAndDisk)
+
+    // Unroll with not enough space. This should succeed but kick out b1 in the process.
+    // Memory store should contain b2 and b3, while disk store should contain only b1
+    val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, returnValues = true)
+    assert(result3.size > 0)
+    assert(!memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(diskStore.contains("b1"))
+    assert(!diskStore.contains("b2"))
+    assert(!diskStore.contains("b3"))
+    memoryStore.remove("b3")
+    store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll huge block with not enough space. This should fail and drop the new block to disk
+    // directly in addition to kicking out b2 in the process. Memory store should contain only
+    // b3, while disk store should contain b1, b2 and b4.
+    val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true)
+    assert(result4.size > 0)
+    assert(result4.data.isRight) // unroll returned bytes from disk
+    assert(!memoryStore.contains("b1"))
+    assert(!memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(!memoryStore.contains("b4"))
+    assert(diskStore.contains("b1"))
+    assert(diskStore.contains("b2"))
+    assert(!diskStore.contains("b3"))
+    assert(diskStore.contains("b4"))
+    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+  }
+
+  test("multiple unrolls by the same thread") {
+    store = makeBlockManager(12000)
+    val memOnly = StorageLevel.MEMORY_ONLY
+    val memoryStore = store.memoryStore
+    val smallList = List.fill(40)(new Array[Byte](100))
+    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // All unroll memory used is released because unrollSafely returned an array
+    memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+    // Unroll memory is not released because unrollSafely returned an iterator
+    // that still depends on the underlying vector used in the process
+    memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisThread
+    assert(unrollMemoryAfterB3 > 0)
+
+    // The unroll memory owned by this thread builds on top of its value after the previous unrolls
+    memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisThread
+    assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
+
+    // ... but only to a certain extent (until we run out of free space to grant new unroll memory)
+    memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisThread
+    memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisThread
+    memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
+    val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisThread
+    assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
+    assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
+    assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
deleted file mode 100644
index 93f0c6a8e64089138e2e1ef3433942a3e0eb0982..0000000000000000000000000000000000000000
--- a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.util.Random
-
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
-import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
-
-class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
-  val NORMAL_ERROR = 0.20
-  val HIGH_ERROR = 0.30
-
-  test("fixed size insertions") {
-    testWith[Int, Long](10000, i => (i, i.toLong))
-    testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
-    testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass()))
-  }
-
-  test("variable size insertions") {
-    val rand = new Random(123456789)
-    def randString(minLen: Int, maxLen: Int): String = {
-      "a" * (rand.nextInt(maxLen - minLen) + minLen)
-    }
-    testWith[Int, String](10000, i => (i, randString(0, 10)))
-    testWith[Int, String](10000, i => (i, randString(0, 100)))
-    testWith[Int, String](10000, i => (i, randString(90, 100)))
-  }
-
-  test("updates") {
-    val rand = new Random(123456789)
-    def randString(minLen: Int, maxLen: Int): String = {
-      "a" * (rand.nextInt(maxLen - minLen) + minLen)
-    }
-    testWith[String, Int](10000, i => (randString(0, 10000), i))
-  }
-
-  def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
-    val map = new SizeTrackingAppendOnlyMap[K, V]()
-    for (i <- 0 until numElements) {
-      val (k, v) = makeElement(i)
-      map(k) = v
-      expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
-    }
-  }
-
-  def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
-    val betterEstimatedSize = SizeEstimator.estimate(obj)
-    assert(betterEstimatedSize * (1 - error) < estimatedSize,
-      s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
-    assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
-      s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
-  }
-}
-
-object SizeTrackingAppendOnlyMapSuite {
-  // Speed test, for reproducibility of results.
-  // These could be highly non-deterministic in general, however.
-  // Results:
-  // AppendOnlyMap:   31 ms
-  // SizeTracker:     54 ms
-  // SizeEstimator: 1500 ms
-  def main(args: Array[String]) {
-    val numElements = 100000
-
-    val baseTimes = for (i <- 0 until 10) yield time {
-      val map = new AppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-      }
-    }
-
-    val sampledTimes = for (i <- 0 until 10) yield time {
-      val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-        map.estimateSize()
-      }
-    }
-
-    val unsampledTimes = for (i <- 0 until 3) yield time {
-      val map = new AppendOnlyMap[Int, LargeDummyClass]()
-      for (i <- 0 until numElements) {
-        map(i) = new LargeDummyClass()
-        SizeEstimator.estimate(map)
-      }
-    }
-
-    println("Base: " + baseTimes)
-    println("SizeTracker (sampled): " + sampledTimes)
-    println("SizeEstimator (unsampled): " + unsampledTimes)
-  }
-
-  def time(f: => Unit): Long = {
-    val start = System.currentTimeMillis()
-    f
-    System.currentTimeMillis() - start
-  }
-
-  private class LargeDummyClass {
-    val arr = new Array[Int](100)
-  }
-}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1f33967249654d04fef36a4b4e819a6855ac0fbf
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.SizeEstimator
+
+class SizeTrackerSuite extends FunSuite {
+  val NORMAL_ERROR = 0.20
+  val HIGH_ERROR = 0.30
+
+  import SizeTrackerSuite._
+
+  test("vector fixed size insertions") {
+    testVector[Long](10000, i => i.toLong)
+    testVector[(Long, Long)](10000, i => (i.toLong, i.toLong))
+    testVector[LargeDummyClass](10000, i => new LargeDummyClass)
+  }
+
+  test("vector variable size insertions") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testVector[String](10000, i => randString(0, 10))
+    testVector[String](10000, i => randString(0, 100))
+    testVector[String](10000, i => randString(90, 100))
+  }
+
+  test("map fixed size insertions") {
+    testMap[Int, Long](10000, i => (i, i.toLong))
+    testMap[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
+    testMap[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass))
+  }
+
+  test("map variable size insertions") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testMap[Int, String](10000, i => (i, randString(0, 10)))
+    testMap[Int, String](10000, i => (i, randString(0, 100)))
+    testMap[Int, String](10000, i => (i, randString(90, 100)))
+  }
+
+  test("map updates") {
+    val rand = new Random(123456789)
+    def randString(minLen: Int, maxLen: Int): String = {
+      "a" * (rand.nextInt(maxLen - minLen) + minLen)
+    }
+    testMap[String, Int](10000, i => (randString(0, 10000), i))
+  }
+
+  def testVector[T: ClassTag](numElements: Int, makeElement: Int => T) {
+    val vector = new SizeTrackingVector[T]
+    for (i <- 0 until numElements) {
+      val item = makeElement(i)
+      vector += item
+      expectWithinError(vector, vector.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+    }
+  }
+
+  def testMap[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
+    val map = new SizeTrackingAppendOnlyMap[K, V]
+    for (i <- 0 until numElements) {
+      val (k, v) = makeElement(i)
+      map(k) = v
+      expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+    }
+  }
+
+  def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
+    val betterEstimatedSize = SizeEstimator.estimate(obj)
+    assert(betterEstimatedSize * (1 - error) < estimatedSize,
+      s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
+    assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
+      s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
+  }
+}
+
+private object SizeTrackerSuite {
+
+  /**
+   * Run speed tests for size tracking collections.
+   */
+  def main(args: Array[String]): Unit = {
+    if (args.size < 1) {
+      println("Usage: SizeTrackerSuite [num elements]")
+      System.exit(1)
+    }
+    val numElements = args(0).toInt
+    vectorSpeedTest(numElements)
+    mapSpeedTest(numElements)
+  }
+
+  /**
+   * Speed test for SizeTrackingVector.
+   *
+   * Results for 100000 elements (possibly non-deterministic):
+   *   PrimitiveVector  15 ms
+   *   SizeTracker      51 ms
+   *   SizeEstimator    2000 ms
+   */
+  def vectorSpeedTest(numElements: Int): Unit = {
+    val baseTimes = for (i <- 0 until 10) yield time {
+      val vector = new PrimitiveVector[LargeDummyClass]
+      for (i <- 0 until numElements) {
+        vector += new LargeDummyClass
+      }
+    }
+    val sampledTimes = for (i <- 0 until 10) yield time {
+      val vector = new SizeTrackingVector[LargeDummyClass]
+      for (i <- 0 until numElements) {
+        vector += new LargeDummyClass
+        vector.estimateSize()
+      }
+    }
+    val unsampledTimes = for (i <- 0 until 3) yield time {
+      val vector = new PrimitiveVector[LargeDummyClass]
+      for (i <- 0 until numElements) {
+        vector += new LargeDummyClass
+        SizeEstimator.estimate(vector)
+      }
+    }
+    printSpeedTestResult("SizeTrackingVector", baseTimes, sampledTimes, unsampledTimes)
+  }
+
+  /**
+   * Speed test for SizeTrackingAppendOnlyMap.
+   *
+   * Results for 100000 elements (possibly non-deterministic):
+   *   AppendOnlyMap  30 ms
+   *   SizeTracker    41 ms
+   *   SizeEstimator  1666 ms
+   */
+  def mapSpeedTest(numElements: Int): Unit = {
+    val baseTimes = for (i <- 0 until 10) yield time {
+      val map = new AppendOnlyMap[Int, LargeDummyClass]
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass
+      }
+    }
+    val sampledTimes = for (i <- 0 until 10) yield time {
+      val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass
+        map.estimateSize()
+      }
+    }
+    val unsampledTimes = for (i <- 0 until 3) yield time {
+      val map = new AppendOnlyMap[Int, LargeDummyClass]
+      for (i <- 0 until numElements) {
+        map(i) = new LargeDummyClass
+        SizeEstimator.estimate(map)
+      }
+    }
+    printSpeedTestResult("SizeTrackingAppendOnlyMap", baseTimes, sampledTimes, unsampledTimes)
+  }
+
+  def printSpeedTestResult(
+      testName: String,
+      baseTimes: Seq[Long],
+      sampledTimes: Seq[Long],
+      unsampledTimes: Seq[Long]): Unit = {
+    println(s"Average times for $testName (ms):")
+    println("  Base - " + averageTime(baseTimes))
+    println("  SizeTracker (sampled) - " + averageTime(sampledTimes))
+    println("  SizeEstimator (unsampled) - " + averageTime(unsampledTimes))
+    println()
+  }
+
+  def time(f: => Unit): Long = {
+    val start = System.currentTimeMillis()
+    f
+    System.currentTimeMillis() - start
+  }
+
+  def averageTime(v: Seq[Long]): Long = {
+    v.sum / v.size
+  }
+
+  private class LargeDummyClass {
+    val arr = new Array[Int](100)
+  }
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 46e3dd914b5acf66c0dac2c140556a1d5a22276c..2e6c85cc2bccabc82fb3eb0fc58589e63fb55bd2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -480,6 +480,15 @@ Apart from these, the following properties are also available, and may be useful
     increase it if you configure your own old generation size.
   </td>
 </tr>
+<tr>
+  <td><code>spark.storage.unrollFraction</code></td>
+  <td>0.2</td>
+  <td>
+    Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory.
+    This is dynamically allocated by dropping existing blocks when there is not enough free
+    storage space to unroll the new block in its entirety.
+  </td>
+</tr>
 <tr>
   <td><code>spark.tachyonStore.baseDir</code></td>
   <td>System.getProperty("java.io.tmpdir")</td>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index e9220db6b1f9ab3e884c922f7795a82b143bef83..5ff88f0dd1cac7cdd0e315b8d4eb66d94b309ee2 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -31,7 +31,6 @@ import com.typesafe.tools.mima.core._
  * MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
  */
 object MimaExcludes {
-
     def excludes(version: String) =
       version match {
         case v if v.startsWith("1.1") =>
@@ -62,6 +61,15 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.storage.MemoryStore.Entry")
           ) ++
+          Seq(
+            // Renamed putValues -> putArray + putIterator
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.MemoryStore.putValues"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.DiskStore.putValues"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.storage.TachyonStore.putValues")
+          ) ++
           Seq(
             ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
           ) ++
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index ce8316bb148912bd8020826ffce5c16a6ef50efa..d934b9cbfc3e8fb6e78f00304878186729006b57 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -110,8 +110,7 @@ private[streaming] class ReceiverSupervisorImpl(
     ) {
     val blockId = optionalBlockId.getOrElse(nextBlockId)
     val time = System.currentTimeMillis
-    blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
-      storageLevel, tellMaster = true)
+    blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
     logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time)  + " ms")
     reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
   }
@@ -124,7 +123,7 @@ private[streaming] class ReceiverSupervisorImpl(
     ) {
     val blockId = optionalBlockId.getOrElse(nextBlockId)
     val time = System.currentTimeMillis
-    blockManager.put(blockId, iterator, storageLevel, tellMaster = true)
+    blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
     logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time)  + " ms")
     reportPushedBlock(blockId, -1, optionalMetadata)
   }