diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
index 4c8802af7ae673d32e0f0bdb7563ec61327376f7..acc49d968c1860a63fcf270314a483ef32195f54 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
@@ -28,7 +28,7 @@ import io.netty.buffer.ByteBufInputStream;
 /**
  * A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}.
  */
-public final class NettyManagedBuffer extends ManagedBuffer {
+public class NettyManagedBuffer extends ManagedBuffer {
   private final ByteBuf buf;
 
   public NettyManagedBuffer(ByteBuf buf) {
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 2634d88367669b93f2b2a90a0212a3c00652a0f1..e5e6a9e4a816c5377ef2159b2ba280d1396d4699 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -30,7 +30,7 @@ import org.apache.spark.io.CompressionCodec
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
 import org.apache.spark.util.{ByteBufferInputStream, Utils}
-import org.apache.spark.util.io.ByteArrayChunkOutputStream
+import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
 
 /**
  * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
@@ -107,7 +107,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
     blocks.zipWithIndex.foreach { case (block, i) =>
       val pieceId = BroadcastBlockId(id, "piece" + i)
-      if (!blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
+      val bytes = new ChunkedByteBuffer(block.duplicate())
+      if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
         throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
       }
     }
@@ -115,10 +116,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
   }
 
   /** Fetch torrent blocks from the driver and/or other executors. */
-  private def readBlocks(): Array[ByteBuffer] = {
+  private def readBlocks(): Array[ChunkedByteBuffer] = {
     // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
     // to the driver, so other executors can pull these chunks from this executor as well.
-    val blocks = new Array[ByteBuffer](numBlocks)
+    val blocks = new Array[ChunkedByteBuffer](numBlocks)
     val bm = SparkEnv.get.blockManager
 
     for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
@@ -182,7 +183,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
         case None =>
           logInfo("Started reading broadcast variable " + id)
           val startTimeMs = System.currentTimeMillis()
-          val blocks = readBlocks()
+          val blocks = readBlocks().flatMap(_.getChunks())
           logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
 
           val obj = TorrentBroadcast.unBlockifyObject[T](
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 6327d55fe75c21c8b1173ad973969b35339f08f0..3201463b8cc972b21ed49b10b9c6d27ff261e5a9 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -36,6 +36,7 @@ import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTa
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
 import org.apache.spark.util._
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Spark executor, backed by a threadpool to run tasks.
@@ -297,7 +298,9 @@ private[spark] class Executor(
           } else if (resultSize > maxDirectResultSize) {
             val blockId = TaskResultBlockId(taskId)
             env.blockManager.putBytes(
-              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
+              blockId,
+              new ChunkedByteBuffer(serializedDirectResult.duplicate()),
+              StorageLevel.MEMORY_AND_DISK_SER)
             logInfo(
               s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
             ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 7eb6d53c109509b277fe5b3c3322d068fadd4e99..873f1b56bd18b4838ef7b0b031c3ecb15234deb8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -83,7 +83,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
                 return
               }
               val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
-                serializedTaskResult.get)
+                serializedTaskResult.get.toByteBuffer)
               sparkEnv.blockManager.master.removeBlock(blockId)
               (deserializedResult, size)
           }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 3bbdf48104c911d08a09d8106d2031f83c81335d..aa2561d8c389855f977565d8b68bc9017fad6f6d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.storage
 
 import java.io._
-import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.nio.ByteBuffer
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.{Await, ExecutionContext, Future}
@@ -26,15 +26,13 @@ import scala.concurrent.duration._
 import scala.util.Random
 import scala.util.control.NonFatal
 
-import sun.nio.ch.DirectBuffer
-
 import org.apache.spark._
 import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.memory.MemoryManager
 import org.apache.spark.network._
-import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.ExternalShuffleClient
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
@@ -43,6 +41,7 @@ import org.apache.spark.serializer.{Serializer, SerializerInstance}
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage.memory._
 import org.apache.spark.util._
+import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
 
 /* Class for returning a fetched block and associated metrics. */
 private[spark] class BlockResult(
@@ -296,7 +295,7 @@ private[spark] class BlockManager(
    * Put the block locally, using the given storage level.
    */
   override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = {
-    putBytes(blockId, data.nioByteBuffer(), level)
+    putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)
   }
 
   /**
@@ -444,7 +443,7 @@ private[spark] class BlockManager(
   /**
    * Get block from the local block manager as serialized bytes.
    */
-  def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
+  def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
     logDebug(s"Getting local block $blockId as bytes")
     // As an optimization for map output fetches, if the block is for a shuffle, return it
     // without acquiring a lock; the disk store never deletes (recent) items so this should work
@@ -453,7 +452,8 @@ private[spark] class BlockManager(
       // TODO: This should gracefully handle case where local block is not available. Currently
       // downstream code will throw an exception.
       Option(
-        shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
+        new ChunkedByteBuffer(
+          shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()))
     } else {
       blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
     }
@@ -465,7 +465,7 @@ private[spark] class BlockManager(
    * Must be called while holding a read lock on the block.
    * Releases the read lock upon exception; keeps the read lock upon successful return.
    */
-  private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = {
+  private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = {
     val level = info.level
     logDebug(s"Level for block $blockId is $level")
     // In order, try to read the serialized bytes from memory, then from disk, then fall back to
@@ -504,7 +504,7 @@ private[spark] class BlockManager(
    */
   def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
     getRemoteBytes(blockId).map { data =>
-      new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())
+      new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.size)
     }
   }
 
@@ -521,7 +521,7 @@ private[spark] class BlockManager(
   /**
    * Get block from remote block managers as serialized bytes.
    */
-  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+  def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
     logDebug(s"Getting remote block $blockId")
     require(blockId != null, "BlockId is null")
     var runningFailureCount = 0
@@ -567,7 +567,7 @@ private[spark] class BlockManager(
       }
 
       if (data != null) {
-        return Some(data)
+        return Some(new ChunkedByteBuffer(data))
       }
       logDebug(s"The value of block $blockId is null")
     }
@@ -705,7 +705,7 @@ private[spark] class BlockManager(
    */
   def putBytes(
       blockId: BlockId,
-      bytes: ByteBuffer,
+      bytes: ChunkedByteBuffer,
       level: StorageLevel,
       tellMaster: Boolean = true): Boolean = {
     require(bytes != null, "Bytes is null")
@@ -725,7 +725,7 @@ private[spark] class BlockManager(
    */
   private def doPutBytes(
       blockId: BlockId,
-      bytes: ByteBuffer,
+      bytes: ChunkedByteBuffer,
       level: StorageLevel,
       tellMaster: Boolean = true,
       keepReadLock: Boolean = false): Boolean = {
@@ -734,25 +734,22 @@ private[spark] class BlockManager(
       // Since we're storing bytes, initiate the replication before storing them locally.
       // This is faster as data is already serialized and ready to send.
       val replicationFuture = if (level.replication > 1) {
-        // Duplicate doesn't copy the bytes, but just creates a wrapper
-        val bufferView = bytes.duplicate()
         Future {
           // This is a blocking action and should run in futureExecutionContext which is a cached
           // thread pool
-          replicate(blockId, bufferView, level)
+          replicate(blockId, bytes, level)
         }(futureExecutionContext)
       } else {
         null
       }
 
-      bytes.rewind()
-      val size = bytes.limit()
+      val size = bytes.size
 
       if (level.useMemory) {
         // Put it in memory first, even if it also has useDisk set to true;
         // We will drop it to disk later if the memory store can't hold it.
         val putSucceeded = if (level.deserialized) {
-          val values = dataDeserialize(blockId, bytes.duplicate())
+          val values = dataDeserialize(blockId, bytes)
           memoryStore.putIterator(blockId, values, level) match {
             case Right(_) => true
             case Left(iter) =>
@@ -922,7 +919,7 @@ private[spark] class BlockManager(
           try {
             replicate(blockId, bytesToReplicate, level)
           } finally {
-            BlockManager.dispose(bytesToReplicate)
+            bytesToReplicate.dispose()
           }
           logDebug("Put block %s remotely took %s"
             .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
@@ -944,29 +941,27 @@ private[spark] class BlockManager(
       blockInfo: BlockInfo,
       blockId: BlockId,
       level: StorageLevel,
-      diskBytes: ByteBuffer): ByteBuffer = {
+      diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = {
     require(!level.deserialized)
     if (level.useMemory) {
       // Synchronize on blockInfo to guard against a race condition where two readers both try to
       // put values read from disk into the MemoryStore.
       blockInfo.synchronized {
         if (memoryStore.contains(blockId)) {
-          BlockManager.dispose(diskBytes)
+          diskBytes.dispose()
           memoryStore.getBytes(blockId).get
         } else {
-          val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
+          val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () => {
             // https://issues.apache.org/jira/browse/SPARK-6076
             // If the file size is bigger than the free memory, OOM will happen. So if we
             // cannot put it into MemoryStore, copyForMemory should not be created. That's why
-            // this action is put into a `() => ByteBuffer` and created lazily.
-            val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
-            copyForMemory.put(diskBytes)
+            // this action is put into a `() => ChunkedByteBuffer` and created lazily.
+            diskBytes.copy()
           })
           if (putSucceeded) {
-            BlockManager.dispose(diskBytes)
+            diskBytes.dispose()
             memoryStore.getBytes(blockId).get
           } else {
-            diskBytes.rewind()
             diskBytes
           }
         }
@@ -1032,7 +1027,7 @@ private[spark] class BlockManager(
    * Replicate block to another node. Not that this is a blocking call that returns after
    * the block has been replicated.
    */
-  private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
+  private def replicate(blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel): Unit = {
     val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
     val numPeersToReplicateTo = level.replication - 1
     val peersForReplication = new ArrayBuffer[BlockManagerId]
@@ -1085,11 +1080,15 @@ private[spark] class BlockManager(
         case Some(peer) =>
           try {
             val onePeerStartTime = System.currentTimeMillis
-            data.rewind()
-            logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
+            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
             blockTransferService.uploadBlockSync(
-              peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)
-            logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"
+              peer.host,
+              peer.port,
+              peer.executorId,
+              blockId,
+              new NettyManagedBuffer(data.toNetty),
+              tLevel)
+            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
               .format(System.currentTimeMillis - onePeerStartTime))
             peersReplicatedTo += peer
             peersForReplication -= peer
@@ -1112,7 +1111,7 @@ private[spark] class BlockManager(
       }
     }
     val timeTakeMs = (System.currentTimeMillis - startTime)
-    logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
+    logDebug(s"Replicating $blockId of ${data.size} bytes to " +
       s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
     if (peersReplicatedTo.size < numPeersToReplicateTo) {
       logWarning(s"Block $blockId replicated to only " +
@@ -1154,7 +1153,7 @@ private[spark] class BlockManager(
    */
   def dropFromMemory(
       blockId: BlockId,
-      data: () => Either[Array[Any], ByteBuffer]): StorageLevel = {
+      data: () => Either[Array[Any], ChunkedByteBuffer]): StorageLevel = {
     logInfo(s"Dropping block $blockId from memory")
     val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
     var blockIsUpdated = false
@@ -1281,11 +1280,11 @@ private[spark] class BlockManager(
     ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
   }
 
-  /** Serializes into a byte buffer. */
-  def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
-    val byteStream = new ByteBufferOutputStream(4096)
-    dataSerializeStream(blockId, byteStream, values)
-    byteStream.toByteBuffer
+  /** Serializes into a chunked byte buffer. */
+  def dataSerialize(blockId: BlockId, values: Iterator[Any]): ChunkedByteBuffer = {
+    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4)
+    dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
+    new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
   }
 
   /**
@@ -1297,6 +1296,14 @@ private[spark] class BlockManager(
     dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
   }
 
+  /**
+   * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
+   * the iterator is reached.
+   */
+  def dataDeserialize(blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[Any] = {
+    dataDeserializeStream(blockId, bytes.toInputStream(dispose = true))
+  }
+
   /**
    * Deserializes a InputStream into an iterator of values and disposes of it when the end of
    * the iterator is reached.
@@ -1325,24 +1332,9 @@ private[spark] class BlockManager(
 }
 
 
-private[spark] object BlockManager extends Logging {
+private[spark] object BlockManager {
   private val ID_GENERATOR = new IdGenerator
 
-  /**
-   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
-   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
-   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
-   * unfortunately no standard API to do this.
-   */
-  def dispose(buffer: ByteBuffer): Unit = {
-    if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
-      logTrace(s"Unmapping $buffer")
-      if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
-        buffer.asInstanceOf[DirectBuffer].cleaner().clean()
-      }
-    }
-  }
-
   def blockIdsToHosts(
       blockIds: Array[BlockId],
       env: SparkEnv,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
index 5886b9c00b557cf97136dfcf23904b7da9d12fcf..12594e6a2bc0ce4f34e3d9816e5881b8515759e2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.storage
 
-import java.nio.ByteBuffer
-
-import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
- * This [[ManagedBuffer]] wraps a [[ByteBuffer]] which was retrieved from the [[BlockManager]]
+ * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]]
  * so that the corresponding block's read lock can be released once this buffer's references
  * are released.
  *
@@ -32,7 +31,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 private[storage] class BlockManagerManagedBuffer(
     blockManager: BlockManager,
     blockId: BlockId,
-    buf: ByteBuffer) extends NioManagedBuffer(buf) {
+    chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) {
 
   override def retain(): ManagedBuffer = {
     super.retain()
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 5c28357ded6d64b56a139d3507c02a53db765c3d..ca23e2391ed0294314daae05800663896fad483e 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -26,6 +26,7 @@ import com.google.common.io.Closeables
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Stores BlockManager blocks on disk.
@@ -71,23 +72,18 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       finishTime - startTime))
   }
 
-  def putBytes(blockId: BlockId, _bytes: ByteBuffer): Unit = {
-    // So that we do not modify the input offsets !
-    // duplicate does not copy buffer, so inexpensive
-    val bytes = _bytes.duplicate()
+  def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
     put(blockId) { fileOutputStream =>
       val channel = fileOutputStream.getChannel
       Utils.tryWithSafeFinally {
-        while (bytes.remaining > 0) {
-          channel.write(bytes)
-        }
+        bytes.writeFully(channel)
       } {
         channel.close()
       }
     }
   }
 
-  def getBytes(blockId: BlockId): ByteBuffer = {
+  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
     val file = diskManager.getFile(blockId.name)
     val channel = new RandomAccessFile(file, "r").getChannel
     Utils.tryWithSafeFinally {
@@ -102,9 +98,9 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
           }
         }
         buf.flip()
-        buf
+        new ChunkedByteBuffer(buf)
       } else {
-        channel.map(MapMode.READ_ONLY, 0, file.length)
+        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
       }
     } {
       channel.close()
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 43cd15921cc973c68ad60d88dd59796073eaee6e..199a5fc270a41367b737659f9758776c2ef7b92d 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.storage
 
+import java.nio.{ByteBuffer, MappedByteBuffer}
+
 import scala.collection.Map
 import scala.collection.mutable
 
+import sun.nio.ch.DirectBuffer
+
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
 
 /**
  * :: DeveloperApi ::
@@ -222,7 +227,22 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
 }
 
 /** Helper methods for storage-related objects. */
-private[spark] object StorageUtils {
+private[spark] object StorageUtils extends Logging {
+
+  /**
+   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
+   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
+   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
+   * unfortunately no standard API to do this.
+   */
+  def dispose(buffer: ByteBuffer): Unit = {
+    if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
+      logTrace(s"Unmapping $buffer")
+      if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
+        buffer.asInstanceOf[DirectBuffer].cleaner().clean()
+      }
+    }
+  }
 
   /**
    * Update the given list of RDDInfo with the given list of storage statuses.
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index a7c1854a41ff73fc26ae553be90d0ffbfc6075fb..94171324f84b519e38a2fb7f828a652d0e3f61ed 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.storage.memory
 
-import java.nio.ByteBuffer
 import java.util.LinkedHashMap
 
 import scala.collection.mutable
@@ -29,8 +28,13 @@ import org.apache.spark.memory.MemoryManager
 import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel}
 import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
+import org.apache.spark.util.io.ChunkedByteBuffer
 
-private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
+private sealed trait MemoryEntry {
+  val size: Long
+}
+private case class DeserializedMemoryEntry(value: Array[Any], size: Long) extends MemoryEntry
+private case class SerializedMemoryEntry(buffer: ChunkedByteBuffer, size: Long) extends MemoryEntry
 
 /**
  * Stores blocks in memory, either as Arrays of deserialized Java objects or as
@@ -91,14 +95,13 @@ private[spark] class MemoryStore(
    *
    * @return true if the put() succeeded, false otherwise.
    */
-  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = {
+  def putBytes(blockId: BlockId, size: Long, _bytes: () => ChunkedByteBuffer): Boolean = {
     require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
     if (memoryManager.acquireStorageMemory(blockId, size)) {
       // We acquired enough memory for the block, so go ahead and put it
-      // Work on a duplicate - since the original input might be used elsewhere.
-      val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
-      assert(bytes.limit == size)
-      val entry = new MemoryEntry(bytes, size, deserialized = false)
+      val bytes = _bytes()
+      assert(bytes.size == size)
+      val entry = new SerializedMemoryEntry(bytes, size)
       entries.synchronized {
         entries.put(blockId, entry)
       }
@@ -184,10 +187,10 @@ private[spark] class MemoryStore(
       val arrayValues = vector.toArray
       vector = null
       val entry = if (level.deserialized) {
-        new MemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues), deserialized = true)
+        new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues))
       } else {
         val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
-        new MemoryEntry(bytes, bytes.limit, deserialized = false)
+        new SerializedMemoryEntry(bytes, bytes.size)
       }
       val size = entry.size
       def transferUnrollToStorage(amount: Long): Unit = {
@@ -241,27 +244,23 @@ private[spark] class MemoryStore(
     }
   }
 
-  def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    val entry = entries.synchronized {
-      entries.get(blockId)
-    }
-    if (entry == null) {
-      None
-    } else {
-      require(!entry.deserialized, "should only call getBytes on blocks stored in serialized form")
-      Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
+  def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+    val entry = entries.synchronized { entries.get(blockId) }
+    entry match {
+      case null => None
+      case e: DeserializedMemoryEntry =>
+        throw new IllegalArgumentException("should only call getBytes on serialized blocks")
+      case SerializedMemoryEntry(bytes, _) => Some(bytes)
     }
   }
 
   def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    val entry = entries.synchronized {
-      entries.get(blockId)
-    }
-    if (entry == null) {
-      None
-    } else {
-      require(entry.deserialized, "should only call getValues on deserialized blocks")
-      Some(entry.value.asInstanceOf[Array[Any]].iterator)
+    val entry = entries.synchronized { entries.get(blockId) }
+    entry match {
+      case null => None
+      case e: SerializedMemoryEntry =>
+        throw new IllegalArgumentException("should only call getValues on deserialized blocks")
+      case DeserializedMemoryEntry(values, _) => Some(values.iterator)
     }
   }
 
@@ -342,10 +341,9 @@ private[spark] class MemoryStore(
           // blocks and removing entries. However the check is still here for
           // future safety.
           if (entry != null) {
-            val data = if (entry.deserialized) {
-              Left(entry.value.asInstanceOf[Array[Any]])
-            } else {
-              Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+            val data = entry match {
+              case DeserializedMemoryEntry(values, _) => Left(values)
+              case SerializedMemoryEntry(buffer, _) => Right(buffer)
             }
             val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data)
             if (newEffectiveStorageLevel.isValid) {
diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
index 54de4d4ee8ca750bc1ea0292b803e4a81e5f54b2..dce2ac63a664cbfdc7ac303f40394bb62efc225b 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
@@ -20,10 +20,10 @@ package org.apache.spark.util
 import java.io.InputStream
 import java.nio.ByteBuffer
 
-import org.apache.spark.storage.BlockManager
+import org.apache.spark.storage.StorageUtils
 
 /**
- * Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose()
+ * Reads data from a ByteBuffer, and optionally cleans it up using StorageUtils.dispose()
  * at the end of the stream (e.g. to close a memory-mapped file).
  */
 private[spark]
@@ -68,12 +68,12 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = f
   }
 
   /**
-   * Clean up the buffer, and potentially dispose of it using BlockManager.dispose().
+   * Clean up the buffer, and potentially dispose of it using StorageUtils.dispose().
    */
   private def cleanUp() {
     if (buffer != null) {
       if (dispose) {
-        BlockManager.dispose(buffer)
+        StorageUtils.dispose(buffer)
       }
       buffer = null
     }
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
new file mode 100644
index 0000000000000000000000000000000000000000..c643c4b63c601ce8c7e6e0e995262f4f97444080
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.io
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import com.google.common.primitives.UnsignedBytes
+import io.netty.buffer.{ByteBuf, Unpooled}
+
+import org.apache.spark.network.util.ByteArrayWritableChannel
+import org.apache.spark.storage.StorageUtils
+
+/**
+ * Read-only byte buffer which is physically stored as multiple chunks rather than a single
+ * contiguous array.
+ *
+ * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must be non-empty and have
+ *               position == 0. Ownership of these buffers is transferred to the ChunkedByteBuffer,
+ *               so if these buffers may also be used elsewhere then the caller is responsible for
+ *               copying them as needed.
+ */
+private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
+  require(chunks != null, "chunks must not be null")
+  require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
+  require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
+
+  /**
+   * This size of this buffer, in bytes.
+   */
+  val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
+
+  def this(byteBuffer: ByteBuffer) = {
+    this(Array(byteBuffer))
+  }
+
+  /**
+   * Write this buffer to a channel.
+   */
+  def writeFully(channel: WritableByteChannel): Unit = {
+    for (bytes <- getChunks()) {
+      while (bytes.remaining > 0) {
+        channel.write(bytes)
+      }
+    }
+  }
+
+  /**
+   * Wrap this buffer to view it as a Netty ByteBuf.
+   */
+  def toNetty: ByteBuf = {
+    Unpooled.wrappedBuffer(getChunks(): _*)
+  }
+
+  /**
+   * Copy this buffer into a new byte array.
+   *
+   * @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size.
+   */
+  def toArray: Array[Byte] = {
+    if (size >= Integer.MAX_VALUE) {
+      throw new UnsupportedOperationException(
+        s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size")
+    }
+    val byteChannel = new ByteArrayWritableChannel(size.toInt)
+    writeFully(byteChannel)
+    byteChannel.close()
+    byteChannel.getData
+  }
+
+  /**
+   * Copy this buffer into a new ByteBuffer.
+   *
+   * @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size.
+   */
+  def toByteBuffer: ByteBuffer = {
+    if (chunks.length == 1) {
+      chunks.head.duplicate()
+    } else {
+      ByteBuffer.wrap(toArray)
+    }
+  }
+
+  /**
+   * Creates an input stream to read data from this ChunkedByteBuffer.
+   *
+   * @param dispose if true, [[dispose()]] will be called at the end of the stream
+   *                in order to close any memory-mapped files which back this buffer.
+   */
+  def toInputStream(dispose: Boolean = false): InputStream = {
+    new ChunkedByteBufferInputStream(this, dispose)
+  }
+
+  /**
+   * Get duplicates of the ByteBuffers backing this ChunkedByteBuffer.
+   */
+  def getChunks(): Array[ByteBuffer] = {
+    chunks.map(_.duplicate())
+  }
+
+  /**
+   * Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers.
+   * The new buffer will share no resources with the original buffer.
+   */
+  def copy(): ChunkedByteBuffer = {
+    val copiedChunks = getChunks().map { chunk =>
+      // TODO: accept an allocator in this copy method to integrate with mem. accounting systems
+      val newChunk = ByteBuffer.allocate(chunk.limit())
+      newChunk.put(chunk)
+      newChunk.flip()
+      newChunk
+    }
+    new ChunkedByteBuffer(copiedChunks)
+  }
+
+  /**
+   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
+   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
+   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
+   * unfortunately no standard API to do this.
+   */
+  def dispose(): Unit = {
+    chunks.foreach(StorageUtils.dispose)
+  }
+}
+
+/**
+ * Reads data from a ChunkedByteBuffer.
+ *
+ * @param dispose if true, [[ChunkedByteBuffer.dispose()]] will be called at the end of the stream
+ *                in order to close any memory-mapped files which back the buffer.
+ */
+private class ChunkedByteBufferInputStream(
+    var chunkedByteBuffer: ChunkedByteBuffer,
+    dispose: Boolean)
+  extends InputStream {
+
+  private[this] var chunks = chunkedByteBuffer.getChunks().iterator
+  private[this] var currentChunk: ByteBuffer = {
+    if (chunks.hasNext) {
+      chunks.next()
+    } else {
+      null
+    }
+  }
+
+  override def read(): Int = {
+    if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) {
+      currentChunk = chunks.next()
+    }
+    if (currentChunk != null && currentChunk.hasRemaining) {
+      UnsignedBytes.toInt(currentChunk.get())
+    } else {
+      close()
+      -1
+    }
+  }
+
+  override def read(dest: Array[Byte], offset: Int, length: Int): Int = {
+    if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) {
+      currentChunk = chunks.next()
+    }
+    if (currentChunk != null && currentChunk.hasRemaining) {
+      val amountToGet = math.min(currentChunk.remaining(), length)
+      currentChunk.get(dest, offset, amountToGet)
+      amountToGet
+    } else {
+      close()
+      -1
+    }
+  }
+
+  override def skip(bytes: Long): Long = {
+    if (currentChunk != null) {
+      val amountToSkip = math.min(bytes, currentChunk.remaining).toInt
+      currentChunk.position(currentChunk.position + amountToSkip)
+      if (currentChunk.remaining() == 0) {
+        if (chunks.hasNext) {
+          currentChunk = chunks.next()
+        } else {
+          close()
+        }
+      }
+      amountToSkip
+    } else {
+      0L
+    }
+  }
+
+  override def close(): Unit = {
+    if (chunkedByteBuffer != null && dispose) {
+      chunkedByteBuffer.dispose()
+    }
+    chunkedByteBuffer = null
+    chunks = null
+    currentChunk = null
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..aab70e7431e075b95e79ba790f2fc54ba2fc8bd5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.network.util.ByteArrayWritableChannel
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferSuite extends SparkFunSuite {
+
+  test("no chunks") {
+    val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer])
+    assert(emptyChunkedByteBuffer.size === 0)
+    assert(emptyChunkedByteBuffer.getChunks().isEmpty)
+    assert(emptyChunkedByteBuffer.toArray === Array.empty)
+    assert(emptyChunkedByteBuffer.toByteBuffer.capacity() === 0)
+    assert(emptyChunkedByteBuffer.toNetty.capacity() === 0)
+    emptyChunkedByteBuffer.toInputStream(dispose = false).close()
+    emptyChunkedByteBuffer.toInputStream(dispose = true).close()
+  }
+
+  test("chunks must be non-empty") {
+    intercept[IllegalArgumentException] {
+      new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))
+    }
+  }
+
+  test("getChunks() duplicates chunks") {
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
+    chunkedByteBuffer.getChunks().head.position(4)
+    assert(chunkedByteBuffer.getChunks().head.position() === 0)
+  }
+
+  test("copy() does not affect original buffer's position") {
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
+    chunkedByteBuffer.copy()
+    assert(chunkedByteBuffer.getChunks().head.position() === 0)
+  }
+
+  test("writeFully() does not affect original buffer's position") {
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
+    chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
+    assert(chunkedByteBuffer.getChunks().head.position() === 0)
+  }
+
+  test("toArray()") {
+    val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes))
+    assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array())
+  }
+
+  test("toArray() throws UnsupportedOperationException if size exceeds 2GB") {
+    val fourMegabyteBuffer = ByteBuffer.allocate(1024 * 1024 * 4)
+    fourMegabyteBuffer.limit(fourMegabyteBuffer.capacity())
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(1024)(fourMegabyteBuffer))
+    assert(chunkedByteBuffer.size === (1024L * 1024L * 1024L * 4L))
+    intercept[UnsupportedOperationException] {
+      chunkedByteBuffer.toArray
+    }
+  }
+
+  test("toInputStream()") {
+    val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
+    val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2))
+    assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit())
+
+    val inputStream = chunkedByteBuffer.toInputStream(dispose = false)
+    val bytesFromStream = new Array[Byte](chunkedByteBuffer.size.toInt)
+    ByteStreams.readFully(inputStream, bytesFromStream)
+    assert(bytesFromStream === bytes1.array() ++ bytes2.array())
+    assert(chunkedByteBuffer.getChunks().head.position() === 0)
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 2e0c0596a75bb0b38a338ed5ca8667f1f47b67cf..edf5cd35e40eef8e411ce2868172008f2b16464c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -44,6 +44,7 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
 import org.apache.spark.util._
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
   with PrivateMethodTester with ResetSystemProperties {
@@ -192,8 +193,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(master.getLocations("a3").size === 0, "master was told about a3")
 
     // Drop a1 and a2 from memory; this should be reported back to the master
-    store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
-    store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
+    store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer])
+    store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer])
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store")
     assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store")
     assert(master.getLocations("a1").size === 0, "master did not remove a1")
@@ -434,8 +435,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       t2.join()
       t3.join()
 
-      store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
-      store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer])
+      store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer])
       store.waitForAsyncReregister()
     }
   }
@@ -1253,9 +1254,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(12000)
     val memoryStore = store.memoryStore
     val blockId = BlockId("rdd_3_10")
-    var bytes: ByteBuffer = null
+    var bytes: ChunkedByteBuffer = null
     memoryStore.putBytes(blockId, 10000, () => {
-      bytes = ByteBuffer.allocate(10000)
+      bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000))
       bytes
     })
     assert(memoryStore.getSize(blockId) === 10000)
@@ -1364,7 +1365,7 @@ private object BlockManagerSuite {
 
     def dropFromMemoryIfExists(
         blockId: BlockId,
-        data: () => Either[Array[Any], ByteBuffer]): Unit = {
+        data: () => Either[Array[Any], ChunkedByteBuffer]): Unit = {
       store.blockInfoManager.lockForWriting(blockId).foreach { info =>
         val newEffectiveStorageLevel = store.dropFromMemory(blockId, data)
         if (newEffectiveStorageLevel.isValid) {
@@ -1394,7 +1395,9 @@ private object BlockManagerSuite {
     val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues)
     val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
     val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
-    val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes)
+    val getLocalBytesAndReleaseLock: (BlockId) => Option[ChunkedByteBuffer] = {
+      wrapGet(store.getLocalBytes)
+    }
   }
 
 }
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
index 97e74fe7060029823986b5110063cf0ac34dca2f..9ed5016510d56435adf4e4fa7b24a50bba956e35 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -21,6 +21,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.util.Arrays
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 class DiskStoreSuite extends SparkFunSuite {
 
@@ -29,7 +30,7 @@ class DiskStoreSuite extends SparkFunSuite {
 
     // Create a non-trivial (not all zeros) byte array
     val bytes = Array.tabulate[Byte](1000)(_.toByte)
-    val byteBuffer = ByteBuffer.wrap(bytes)
+    val byteBuffer = new ChunkedByteBuffer(ByteBuffer.wrap(bytes))
 
     val blockId = BlockId("rdd_1_2")
     val diskBlockManager = new DiskBlockManager(new SparkConf(), deleteFilesOnStop = true)
@@ -44,9 +45,10 @@ class DiskStoreSuite extends SparkFunSuite {
     val notMapped = diskStoreNotMapped.getBytes(blockId)
 
     // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
-    assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
+    assert(notMapped.getChunks().forall(_.getClass.getName.endsWith("HeapByteBuffer")),
       "Expected HeapByteBuffer for un-mapped read")
-    assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
+    assert(mapped.getChunks().forall(_.isInstanceOf[MappedByteBuffer]),
+      "Expected MappedByteBuffer for mapped read")
 
     def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
       val array = new Array[Byte](in.remaining())
@@ -54,9 +56,7 @@ class DiskStoreSuite extends SparkFunSuite {
       array
     }
 
-    val mappedAsArray = arrayFromByteBuffer(mapped)
-    val notMappedAsArray = arrayFromByteBuffer(notMapped)
-    assert(Arrays.equals(mappedAsArray, bytes))
-    assert(Arrays.equals(notMappedAsArray, bytes))
+    assert(Arrays.equals(mapped.toArray, bytes))
+    assert(Arrays.equals(notMapped.toArray, bytes))
   }
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index f811784b25c82ecd5cd953bfc98a32dc7dc85713..8625882b04421d6b666b8de9d884d634b39361bb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -28,11 +28,13 @@ import org.apache.spark.rdd.BlockRDD
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.streaming.util._
 import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
  * It contains information about the id of the blocks having this partition's data and
  * the corresponding record handle in the write ahead log that backs the partition.
+ *
  * @param index index of the partition
  * @param blockId id of the block having the partition data
  * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
@@ -59,7 +61,6 @@ class WriteAheadLogBackedBlockRDDPartition(
  * correctness, and it can be used in situations where it is known that the block
  * does not exist in the Spark executors (e.g. after a failed driver is restarted).
  *
- *
  * @param sc SparkContext
  * @param _blockIds Ids of the blocks that contains this RDD's data
  * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
@@ -156,7 +157,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
       logInfo(s"Read partition data of $this from write ahead log, record handle " +
         partition.walRecordHandle)
       if (storeInBlockManager) {
-        blockManager.putBytes(blockId, dataRead, storageLevel)
+        blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel)
         logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
         dataRead.rewind()
       }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 4880884b0509dcb8a4dd057a3965cc62b28992e0..6d4f4b99c175fe8ef6bb0620411dffc30fed9c49 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -30,6 +30,7 @@ import org.apache.spark.storage._
 import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
 import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /** Trait that represents the metadata related to storage of blocks */
 private[streaming] trait ReceivedBlockStoreResult {
@@ -84,7 +85,8 @@ private[streaming] class BlockManagerBasedBlockHandler(
         numRecords = countIterator.count
         putResult
       case ByteBufferBlock(byteBuffer) =>
-        blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
+        blockManager.putBytes(
+          blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true)
       case o =>
         throw new SparkException(
           s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
@@ -178,15 +180,18 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
         numRecords = countIterator.count
         serializedBlock
       case ByteBufferBlock(byteBuffer) =>
-        byteBuffer
+        new ChunkedByteBuffer(byteBuffer.duplicate())
       case _ =>
         throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
     }
 
     // Store the block in block manager
     val storeInBlockManagerFuture = Future {
-      val putSucceeded =
-        blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
+      val putSucceeded = blockManager.putBytes(
+        blockId,
+        serializedBlock,
+        effectiveStorageLevel,
+        tellMaster = true)
       if (!putSucceeded) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level $storageLevel")
@@ -195,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
 
     // Store the block in write ahead log
     val storeInWriteAheadLogFuture = Future {
-      writeAheadLog.write(serializedBlock, clock.getTimeMillis())
+      writeAheadLog.write(serializedBlock.toByteBuffer, clock.getTimeMillis())
     }
 
     // Combine the futures, wait for both to complete, and return the write ahead log record handle
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 2d509af85ae339234fd36019e0be81371e4142b7..76f67ed601ea5999d5bcb3551a828e02759bf922 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -339,7 +339,7 @@ class ReceivedBlockHandlerSuite
 
     storeAndVerify(blocks.map { b => IteratorBlock(b.toIterator) })
     storeAndVerify(blocks.map { b => ArrayBufferBlock(new ArrayBuffer ++= b) })
-    storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b)) })
+    storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) })
   }
 
   /** Test error handling when blocks that cannot be stored */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index 79ac833c1846b8975ac3e93733024cd529e0abea..c4bf42d0f272dbbe53d0c844345edfc599e5fdad 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -223,7 +223,7 @@ class WriteAheadLogBackedBlockRDDSuite
     require(blockData.size === blockIds.size)
     val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf)
     val segments = blockData.zip(blockIds).map { case (data, id) =>
-      writer.write(blockManager.dataSerialize(id, data.iterator))
+      writer.write(blockManager.dataSerialize(id, data.iterator).toByteBuffer)
     }
     writer.close()
     segments