diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 16d67cbfca80b89f9d0ee444916ff2a9f2f1b3ed..5048c7dab240ba2dc307124de17a4d316dbaa5eb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream, OutputStream}
+import java.io._
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -489,16 +489,17 @@ private[spark] class BlockManager(
         if (level.useOffHeap) {
           logDebug(s"Getting block $blockId from ExternalBlockStore")
           if (externalBlockStore.contains(blockId)) {
-            externalBlockStore.getBytes(blockId) match {
-              case Some(bytes) =>
-                if (!asBlockResult) {
-                  return Some(bytes)
-                } else {
-                  return Some(new BlockResult(
-                    dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
-                }
+            val result = if (asBlockResult) {
+              externalBlockStore.getValues(blockId)
+                .map(new BlockResult(_, DataReadMethod.Memory, info.size))
+            } else {
+              externalBlockStore.getBytes(blockId)
+            }
+            result match {
+              case Some(values) =>
+                return result
               case None =>
-                logDebug(s"Block $blockId not found in externalBlockStore")
+                logDebug(s"Block $blockId not found in ExternalBlockStore")
             }
           }
         }
@@ -1206,8 +1207,19 @@ private[spark] class BlockManager(
       bytes: ByteBuffer,
       serializer: Serializer = defaultSerializer): Iterator[Any] = {
     bytes.rewind()
-    val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
-    serializer.newInstance().deserializeStream(stream).asIterator
+    dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
+  }
+
+  /**
+   * Deserializes a InputStream into an iterator of values and disposes of it when the end of
+   * the iterator is reached.
+   */
+  def dataDeserializeStream(
+      blockId: BlockId,
+      inputStream: InputStream,
+      serializer: Serializer = defaultSerializer): Iterator[Any] = {
+    val stream = new BufferedInputStream(inputStream)
+    serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
   }
 
   def stop(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
index 8964762df6af3f5cf26b69f98847587305270955..f39325a12d244cf0a5f66a1208f276bf13d8b391 100644
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockManager.scala
@@ -32,6 +32,8 @@ import java.nio.ByteBuffer
  */
 private[spark] abstract class ExternalBlockManager {
 
+  protected var blockManager: BlockManager = _
+
   override def toString: String = {"External Block Store"}
 
   /**
@@ -41,7 +43,9 @@ private[spark] abstract class ExternalBlockManager {
    *
    * @throws java.io.IOException if there is any file system failure during the initialization.
    */
-  def init(blockManager: BlockManager, executorId: String): Unit
+  def init(blockManager: BlockManager, executorId: String): Unit = {
+    this.blockManager = blockManager
+  }
 
   /**
    * Drop the block from underlying external block store, if it exists..
@@ -73,6 +77,11 @@ private[spark] abstract class ExternalBlockManager {
    */
   def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit
 
+  def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
+    val bytes = blockManager.dataSerialize(blockId, values)
+    putBytes(blockId, bytes)
+  }
+
   /**
    * Retrieve the block bytes.
    * @return Some(ByteBuffer) if the block bytes is successfully retrieved
@@ -82,6 +91,17 @@ private[spark] abstract class ExternalBlockManager {
    */
   def getBytes(blockId: BlockId): Option[ByteBuffer]
 
+  /**
+   * Retrieve the block data.
+   * @return Some(Iterator[Any]) if the block data is successfully retrieved
+   *         None if the block does not exist in the external block store.
+   *
+   * @throws java.io.IOException if there is any file system failure in getting the block.
+   */
+  def getValues(blockId: BlockId): Option[Iterator[_]] = {
+    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+  }
+
   /**
    * Get the size of the block saved in the underlying external block store,
    * which is saved before by putBytes.
diff --git a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
index 0bf770306ae9b3f25fc4c1b4050db15dab5c37d1..291394ed348167895e5d4f89b5225b537280f09f 100644
--- a/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ExternalBlockStore.scala
@@ -18,9 +18,11 @@
 package org.apache.spark.storage
 
 import java.nio.ByteBuffer
+
+import scala.util.control.NonFatal
+
 import org.apache.spark.Logging
 import org.apache.spark.util.Utils
-import scala.util.control.NonFatal
 
 
 /**
@@ -40,7 +42,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
       externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
     } catch {
       case NonFatal(t) =>
-        logError(s"error in getSize from $blockId", t)
+        logError(s"Error in getSize($blockId)", t)
         0L
     }
   }
@@ -54,7 +56,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
       values: Array[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    putIterator(blockId, values.toIterator, level, returnValues)
+    putIntoExternalBlockStore(blockId, values.toIterator, returnValues)
   }
 
   override def putIterator(
@@ -62,42 +64,70 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
       values: Iterator[Any],
       level: StorageLevel,
       returnValues: Boolean): PutResult = {
-    logDebug(s"Attempting to write values for block $blockId")
-    val bytes = blockManager.dataSerialize(blockId, values)
-    putIntoExternalBlockStore(blockId, bytes, returnValues)
+    putIntoExternalBlockStore(blockId, values, returnValues)
   }
 
   private def putIntoExternalBlockStore(
       blockId: BlockId,
-      bytes: ByteBuffer,
+      values: Iterator[_],
       returnValues: Boolean): PutResult = {
-    // So that we do not modify the input offsets !
-    // duplicate does not copy buffer, so inexpensive
-    val byteBuffer = bytes.duplicate()
-    byteBuffer.rewind()
-    logDebug(s"Attempting to put block $blockId into ExtBlk store")
+    logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
     // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
     try {
       val startTime = System.currentTimeMillis
       if (externalBlockManager.isDefined) {
-        externalBlockManager.get.putBytes(blockId, bytes)
+        externalBlockManager.get.putValues(blockId, values)
+        val size = getSize(blockId)
+        val data = if (returnValues) {
+          Left(getValues(blockId).get)
+        } else {
+          null
+        }
         val finishTime = System.currentTimeMillis
         logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
-          blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
+          blockId, Utils.bytesToString(size), finishTime - startTime))
+        PutResult(size, data)
+      } else {
+        logError(s"Error in putValues($blockId): no ExternalBlockManager has been configured")
+        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
+      }
+    } catch {
+      case NonFatal(t) =>
+        logError(s"Error in putValues($blockId)", t)
+        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
+    }
+  }
 
-        if (returnValues) {
-          PutResult(bytes.limit(), Right(bytes.duplicate()))
+  private def putIntoExternalBlockStore(
+      blockId: BlockId,
+      bytes: ByteBuffer,
+      returnValues: Boolean): PutResult = {
+    logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
+    // we should never hit here if externalBlockManager is None. Handle it anyway for safety.
+    try {
+      val startTime = System.currentTimeMillis
+      if (externalBlockManager.isDefined) {
+        val byteBuffer = bytes.duplicate()
+        byteBuffer.rewind()
+        externalBlockManager.get.putBytes(blockId, byteBuffer)
+        val size = bytes.limit()
+        val data = if (returnValues) {
+          Right(bytes)
         } else {
-          PutResult(bytes.limit(), null)
+          null
         }
+        val finishTime = System.currentTimeMillis
+        logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
+          blockId, Utils.bytesToString(size), finishTime - startTime))
+        PutResult(size, data)
       } else {
-        logError(s"error in putBytes $blockId")
-        PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+        logError(s"Error in putBytes($blockId): no ExternalBlockManager has been configured")
+        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
       }
     } catch {
       case NonFatal(t) =>
-        logError(s"error in putBytes $blockId", t)
-        PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
+        logError(s"Error in putBytes($blockId)", t)
+        PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
     }
   }
 
@@ -107,13 +137,19 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
       externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
     } catch {
       case NonFatal(t) =>
-        logError(s"error in removing $blockId", t)
+        logError(s"Error in removeBlock($blockId)", t)
         true
     }
   }
 
   override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
+    try {
+      externalBlockManager.flatMap(_.getValues(blockId))
+    } catch {
+      case NonFatal(t) =>
+        logError(s"Error in getValues($blockId)", t)
+        None
+    }
   }
 
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -121,7 +157,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
       externalBlockManager.flatMap(_.getBytes(blockId))
     } catch {
       case NonFatal(t) =>
-        logError(s"error in getBytes from $blockId", t)
+        logError(s"Error in getBytes($blockId)", t)
         None
     }
   }
@@ -130,13 +166,13 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
     try {
       val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
       if (!ret) {
-        logInfo(s"remove block $blockId")
+        logInfo(s"Remove block $blockId")
         blockManager.removeBlock(blockId, true)
       }
       ret
     } catch {
       case NonFatal(t) =>
-        logError(s"error in getBytes from $blockId", t)
+        logError(s"Error in getBytes($blockId)", t)
         false
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index bdc6276e4191575d7f522e95aab9628f8903dfe5..fb4ba0eac9d9a17d3b2e6aa4947e8463e0edfe40 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -22,7 +22,10 @@ import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
 import java.util.{Date, Random}
 
+import scala.util.control.NonFatal
+
 import com.google.common.io.ByteStreams
+
 import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
 import tachyon.TachyonURI
 
@@ -38,7 +41,6 @@ import org.apache.spark.util.Utils
  */
 private[spark] class TachyonBlockManager() extends ExternalBlockManager with Logging {
 
-  var blockManager: BlockManager =_
   var rootDirs: String = _
   var master: String = _
   var client: tachyon.client.TachyonFS = _
@@ -52,7 +54,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
 
 
   override def init(blockManager: BlockManager, executorId: String): Unit = {
-    this.blockManager = blockManager
+    super.init(blockManager, executorId)
     val storeDir = blockManager.conf.get(ExternalBlockStore.BASE_DIR, "/tmp_spark_tachyon")
     val appFolderName = blockManager.conf.get(ExternalBlockStore.FOLD_NAME)
 
@@ -95,8 +97,29 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
   override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
     val file = getFile(blockId)
     val os = file.getOutStream(WriteType.TRY_CACHE)
-    os.write(bytes.array())
-    os.close()
+    try {
+      os.write(bytes.array())
+    } catch {
+      case NonFatal(e) => 
+        logWarning(s"Failed to put bytes of block $blockId into Tachyon", e)
+        os.cancel()
+    } finally {
+      os.close()
+    }
+  }
+
+  override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
+    val file = getFile(blockId)
+    val os = file.getOutStream(WriteType.TRY_CACHE)
+    try {
+      blockManager.dataSerializeStream(blockId, os, values)
+    } catch {
+      case NonFatal(e) => 
+        logWarning(s"Failed to put values of block $blockId into Tachyon", e)
+        os.cancel()
+    } finally {
+      os.close()
+    }
   }
 
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -105,21 +128,31 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
       return None
     }
     val is = file.getInStream(ReadType.CACHE)
-    assert (is != null)
     try {
       val size = file.length
       val bs = new Array[Byte](size.asInstanceOf[Int])
       ByteStreams.readFully(is, bs)
       Some(ByteBuffer.wrap(bs))
     } catch {
-      case ioe: IOException =>
-        logWarning(s"Failed to fetch the block $blockId from Tachyon", ioe)
+      case NonFatal(e) =>
+        logWarning(s"Failed to get bytes of block $blockId from Tachyon", e)
         None
     } finally {
       is.close()
     }
   }
 
+  override def getValues(blockId: BlockId): Option[Iterator[_]] = {
+    val file = getFile(blockId)
+    if (file == null || file.getLocationHosts().size() == 0) {
+      return None
+    }
+    val is = file.getInStream(ReadType.CACHE)
+    Option(is).map { is =>
+      blockManager.dataDeserializeStream(blockId, is)
+    }
+  }
+
   override def getSize(blockId: BlockId): Long = {
     getFile(blockId.name).length
   }
@@ -184,7 +217,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
             tachyonDir = client.getFile(path)
           }
         } catch {
-          case e: Exception =>
+          case NonFatal(e) =>
             logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
         }
       }
@@ -206,7 +239,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
           Utils.deleteRecursively(tachyonDir, client)
         }
       } catch {
-        case e: Exception =>
+        case NonFatal(e) =>
           logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
       }
     }