diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 272d7cdad3bc213b1202b5d249015d957319181b..41441720a7c8f81004fb897763a6d8ae2d29e5d9 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -86,10 +86,13 @@ object SparkEnv extends Logging {
     }
 
     val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
-    
-    val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
+
+    val masterIp: String = System.getProperty("spark.master.host", "localhost")
+    val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+    val blockManagerMaster = new BlockManagerMaster(
+      actorSystem, isMaster, isLocal, masterIp, masterPort)
     val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
-    
+
     val connectionManager = blockManager.connectionManager
 
     val broadcastManager = new BroadcastManager(isMaster)
@@ -104,7 +107,7 @@ object SparkEnv extends Logging {
 
     val shuffleFetcher = instantiateClass[ShuffleFetcher](
       "spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher")
-    
+
     val httpFileServer = new HttpFileServer()
     httpFileServer.initialize()
     System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index df295b18207b3253d303cee2e3d51730789ec953..7a8ac10cdd88e51ebd06588ca6b7801cc9a693d1 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,59 +1,39 @@
 package spark.storage
 
-import akka.actor.{ActorSystem, Cancellable}
+import java.io.{InputStream, OutputStream}
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import scala.collection.JavaConversions._
+
+import akka.actor.{ActorSystem, Cancellable, Props}
 import akka.dispatch.{Await, Future}
 import akka.util.Duration
 import akka.util.duration._
 
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import java.io.{InputStream, OutputStream, Externalizable, ObjectInput, ObjectOutput}
-import java.nio.{MappedByteBuffer, ByteBuffer}
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
-import scala.collection.JavaConversions._
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
 
 import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
 import spark.network._
 import spark.serializer.Serializer
-import spark.util.ByteBufferInputStream
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import sun.nio.ch.DirectBuffer
-
-
-private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
-  def this() = this(null, 0)  // For deserialization only
-
-  def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
-
-  override def writeExternal(out: ObjectOutput) {
-    out.writeUTF(ip)
-    out.writeInt(port)
-  }
+import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
 
-  override def readExternal(in: ObjectInput) {
-    ip = in.readUTF()
-    port = in.readInt()
-  }
-
-  override def toString = "BlockManagerId(" + ip + ", " + port + ")"
-
-  override def hashCode = ip.hashCode * 41 + port
+import sun.nio.ch.DirectBuffer
 
-  override def equals(that: Any) = that match {
-    case id: BlockManagerId => port == id.port && ip == id.ip
-    case _ => false
-  }
-}
 
 private[spark]
 case class BlockException(blockId: String, message: String, ex: Exception = null)
 extends Exception(message)
 
 private[spark]
-class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
-                   val serializer: Serializer, maxMemory: Long)
+class BlockManager(
+    actorSystem: ActorSystem,
+    val master: BlockManagerMaster,
+    val serializer: Serializer,
+    maxMemory: Long)
   extends Logging {
 
   class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@@ -79,7 +59,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
     }
   }
 
-  private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
+  private val blockInfo = new TimeStampedHashMap[String, BlockInfo]
 
   private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
   private[storage] val diskStore: BlockStore =
@@ -110,16 +90,20 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
 
   val host = System.getProperty("spark.hostname", Utils.localHostName())
 
+  val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
+    name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
+
   @volatile private var shuttingDown = false
 
   private def heartBeat() {
-    if (!master.mustHeartBeat(HeartBeat(blockManagerId))) {
+    if (!master.sendHeartBeat(blockManagerId)) {
       reregister()
     }
   }
 
   var heartBeatTask: Cancellable = null
 
+  val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
   initialize()
 
   /**
@@ -134,8 +118,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
    * BlockManagerWorker actor.
    */
   private def initialize() {
-    master.mustRegisterBlockManager(
-      RegisterBlockManager(blockManagerId, maxMemory))
+    master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
     BlockManagerWorker.startBlockManagerWorker(this)
     if (!BlockManager.getDisableHeartBeatsForTesting) {
       heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
@@ -156,8 +139,8 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
    */
   private def reportAllBlocks() {
     logInfo("Reporting " + blockInfo.size + " blocks to the master.")
-    for (blockId <- blockInfo.keys) {
-      if (!tryToReportBlockStatus(blockId)) {
+    for ((blockId, info) <- blockInfo) {
+      if (!tryToReportBlockStatus(blockId, info)) {
         logError("Failed to report " + blockId + " to master; giving up.")
         return
       }
@@ -171,26 +154,22 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
   def reregister() {
     // TODO: We might need to rate limit reregistering.
     logInfo("BlockManager reregistering with master")
-    master.mustRegisterBlockManager(
-      RegisterBlockManager(blockManagerId, maxMemory))
+    master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
     reportAllBlocks()
   }
 
   /**
    * Get storage level of local block. If no info exists for the block, then returns null.
    */
-  def getLevel(blockId: String): StorageLevel = {
-    val info = blockInfo.get(blockId)
-    if (info != null) info.level else null
-  }
+  def getLevel(blockId: String): StorageLevel = blockInfo.get(blockId).map(_.level).orNull
 
   /**
    * Tell the master about the current storage status of a block. This will send a block update
    * message reflecting the current status, *not* the desired storage level in its block info.
    * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
    */
-  def reportBlockStatus(blockId: String) {
-    val needReregister = !tryToReportBlockStatus(blockId)
+  def reportBlockStatus(blockId: String, info: BlockInfo) {
+    val needReregister = !tryToReportBlockStatus(blockId, info)
     if (needReregister) {
       logInfo("Got told to reregister updating block " + blockId)
       // Reregistering will report our new block for free.
@@ -200,33 +179,27 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
   }
 
   /**
-   * Actually send a BlockUpdate message. Returns the mater's response, which will be true if the
-   * block was successfully recorded and false if the slave needs to re-register.
+   * Actually send a UpdateBlockInfo message. Returns the mater's response,
+   * which will be true if the block was successfully recorded and false if
+   * the slave needs to re-register.
    */
-  private def tryToReportBlockStatus(blockId: String): Boolean = {
-    val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match {
-      case null =>
-        (StorageLevel.NONE, 0L, 0L, false)
-      case info =>
-        info.synchronized {
-          info.level match {
-            case null =>
-              (StorageLevel.NONE, 0L, 0L, false)
-            case level =>
-              val inMem = level.useMemory && memoryStore.contains(blockId)
-              val onDisk = level.useDisk && diskStore.contains(blockId)
-              (
-                new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
-                if (inMem) memoryStore.getSize(blockId) else 0L,
-                if (onDisk) diskStore.getSize(blockId) else 0L,
-                info.tellMaster
-              )
-          }
-        }
+  private def tryToReportBlockStatus(blockId: String, info: BlockInfo): Boolean = {
+    val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
+      info.level match {
+        case null =>
+          (StorageLevel.NONE, 0L, 0L, false)
+        case level =>
+          val inMem = level.useMemory && memoryStore.contains(blockId)
+          val onDisk = level.useDisk && diskStore.contains(blockId)
+          val storageLevel = new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
+          val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
+          val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
+          (storageLevel, memSize, diskSize, info.tellMaster)
+      }
     }
 
     if (tellMaster) {
-      master.mustBlockUpdate(BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
+      master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
     } else {
       true
     }
@@ -238,7 +211,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
    */
   def getLocations(blockId: String): Seq[String] = {
     val startTimeMs = System.currentTimeMillis
-    var managers = master.mustGetLocations(GetLocations(blockId))
+    var managers = master.getLocations(blockId)
     val locations = managers.map(_.ip)
     logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs))
     return locations
@@ -249,8 +222,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
    */
   def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
     val startTimeMs = System.currentTimeMillis
-    val locations = master.mustGetLocationsMultipleBlockIds(
-      GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray
+    val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray
     logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
     return locations
   }
@@ -272,7 +244,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
       }
     }
 
-    val info = blockInfo.get(blockId)
+    val info = blockInfo.get(blockId).orNull
     if (info != null) {
       info.synchronized {
         info.waitForReady() // In case the block is still being put() by another thread
@@ -357,7 +329,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
       }
     }
 
-    val info = blockInfo.get(blockId)
+    val info = blockInfo.get(blockId).orNull
     if (info != null) {
       info.synchronized {
         info.waitForReady() // In case the block is still being put() by another thread
@@ -413,7 +385,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
     }
     logDebug("Getting remote block " + blockId)
     // Get locations of block
-    val locations = master.mustGetLocations(GetLocations(blockId))
+    val locations = master.getLocations(blockId)
 
     // Get block from remote locations
     for (loc <- locations) {
@@ -615,7 +587,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
       throw new IllegalArgumentException("Storage level is null or invalid")
     }
 
-    val oldBlock = blockInfo.get(blockId)
+    val oldBlock = blockInfo.get(blockId).orNull
     if (oldBlock != null) {
       logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
       oldBlock.waitForReady()
@@ -670,7 +642,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
       // and tell the master about it.
       myInfo.markReady(size)
       if (tellMaster) {
-        reportBlockStatus(blockId)
+        reportBlockStatus(blockId, myInfo)
       }
     }
     logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@@ -716,7 +688,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
       throw new IllegalArgumentException("Storage level is null or invalid")
     }
 
-    if (blockInfo.containsKey(blockId)) {
+    if (blockInfo.contains(blockId)) {
       logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
       return
     }
@@ -757,7 +729,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
       // and tell the master about it.
       myInfo.markReady(bytes.limit)
       if (tellMaster) {
-        reportBlockStatus(blockId)
+        reportBlockStatus(blockId, myInfo)
       }
     }
 
@@ -791,7 +763,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
     val tLevel: StorageLevel =
       new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
     if (cachedPeers == null) {
-      cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+      cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
     }
     for (peer: BlockManagerId <- cachedPeers) {
       val start = System.nanoTime
@@ -838,7 +810,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
    */
   def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
     logInfo("Dropping block " + blockId + " from memory")
-    val info = blockInfo.get(blockId)
+    val info = blockInfo.get(blockId).orNull
     if (info != null)  {
       info.synchronized {
         val level = info.level
@@ -851,9 +823,12 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
               diskStore.putBytes(blockId, bytes, level)
           }
         }
-        memoryStore.remove(blockId)
+        val blockWasRemoved = memoryStore.remove(blockId)
+        if (!blockWasRemoved) {
+          logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
+        }
         if (info.tellMaster) {
-          reportBlockStatus(blockId)
+          reportBlockStatus(blockId, info)
         }
         if (!level.useDisk) {
           // The block is completely gone from this node; forget it so we can put() it again later.
@@ -865,6 +840,53 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
     }
   }
 
+  /**
+   * Remove a block from both memory and disk.
+   */
+  def removeBlock(blockId: String) {
+    logInfo("Removing block " + blockId)
+    val info = blockInfo.get(blockId).orNull
+    if (info != null) info.synchronized {
+      // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+      val removedFromMemory = memoryStore.remove(blockId)
+      val removedFromDisk = diskStore.remove(blockId)
+      if (!removedFromMemory && !removedFromDisk) {
+        logWarning("Block " + blockId + " could not be removed as it was not found in either " +
+          "the disk or memory store")
+      }
+      blockInfo.remove(blockId)
+      if (info.tellMaster) {
+        reportBlockStatus(blockId, info)
+      }
+    } else {
+      // The block has already been removed; do nothing.
+      logWarning("Asked to remove block " + blockId + ", which does not exist")
+    }
+  }
+
+  def dropOldBlocks(cleanupTime: Long) {
+    logInfo("Dropping blocks older than " + cleanupTime)
+    val iterator = blockInfo.internalMap.entrySet().iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next()
+      val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
+      if (time < cleanupTime) {
+        info.synchronized {
+          val level = info.level
+          if (level.useMemory) {
+            memoryStore.remove(id)
+          }
+          if (level.useDisk) {
+            diskStore.remove(id)
+          }
+          iterator.remove()
+          logInfo("Dropped block " + id)
+        }
+        reportBlockStatus(id, info)
+      }
+    }
+  }
+
   def shouldCompress(blockId: String): Boolean = {
     if (blockId.startsWith("shuffle_")) {
       compressShuffle
@@ -914,6 +936,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
       heartBeatTask.cancel()
     }
     connectionManager.stop()
+    master.actorSystem.stop(slaveActor)
     blockInfo.clear()
     memoryStore.clear()
     diskStore.clear()
@@ -923,6 +946,9 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
 
 private[spark]
 object BlockManager extends Logging {
+
+  val ID_GENERATOR = new IdGenerator
+
   def getMaxMemoryFromSystemProperties: Long = {
     val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
     (Runtime.getRuntime.maxMemory * memoryFraction).toLong
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
new file mode 100644
index 0000000000000000000000000000000000000000..488679f0496b42d4cbb59ba6205583a4992584b0
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -0,0 +1,48 @@
+package spark.storage
+
+import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
+import java.util.concurrent.ConcurrentHashMap
+
+
+private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
+  def this() = this(null, 0)  // For deserialization only
+
+  def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
+
+  override def writeExternal(out: ObjectOutput) {
+    out.writeUTF(ip)
+    out.writeInt(port)
+  }
+
+  override def readExternal(in: ObjectInput) {
+    ip = in.readUTF()
+    port = in.readInt()
+  }
+
+  @throws(classOf[IOException])
+  private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
+
+  override def toString = "BlockManagerId(" + ip + ", " + port + ")"
+
+  override def hashCode = ip.hashCode * 41 + port
+
+  override def equals(that: Any) = that match {
+    case id: BlockManagerId => port == id.port && ip == id.ip
+    case _ => false
+  }
+}
+
+
+private[spark] object BlockManagerId {
+
+  val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
+
+  def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
+    if (blockManagerIdCache.containsKey(id)) {
+      blockManagerIdCache.get(id)
+    } else {
+      blockManagerIdCache.put(id, id)
+      id
+    }
+  }
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 0a4e68f43769c6ced41060c14df5a8cc07298ba5..a3d8671834dbc676c83e4a739c40462127a93e93 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -1,676 +1,167 @@
 package spark.storage
 
-import java.io._
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
 
-import akka.actor._
-import akka.dispatch._
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.dispatch.Await
 import akka.pattern.ask
-import akka.remote._
 import akka.util.{Duration, Timeout}
 import akka.util.duration._
 
 import spark.{Logging, SparkException, Utils}
 
 
-private[spark]
-sealed trait ToBlockManagerMaster
-
-private[spark]
-case class RegisterBlockManager(
-    blockManagerId: BlockManagerId,
-    maxMemSize: Long)
-  extends ToBlockManagerMaster
-
-private[spark]
-case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
-private[spark]
-class BlockUpdate(
-    var blockManagerId: BlockManagerId,
-    var blockId: String,
-    var storageLevel: StorageLevel,
-    var memSize: Long,
-    var diskSize: Long)
-  extends ToBlockManagerMaster
-  with Externalizable {
-
-  def this() = this(null, null, null, 0, 0)  // For deserialization only
-
-  override def writeExternal(out: ObjectOutput) {
-    blockManagerId.writeExternal(out)
-    out.writeUTF(blockId)
-    storageLevel.writeExternal(out)
-    out.writeInt(memSize.toInt)
-    out.writeInt(diskSize.toInt)
-  }
-
-  override def readExternal(in: ObjectInput) {
-    blockManagerId = new BlockManagerId()
-    blockManagerId.readExternal(in)
-    blockId = in.readUTF()
-    storageLevel = new StorageLevel()
-    storageLevel.readExternal(in)
-    memSize = in.readInt()
-    diskSize = in.readInt()
-  }
-}
-
-private[spark]
-object BlockUpdate {
-  def apply(blockManagerId: BlockManagerId,
-      blockId: String,
-      storageLevel: StorageLevel,
-      memSize: Long,
-      diskSize: Long): BlockUpdate = {
-    new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
-  }
-
-  // For pattern-matching
-  def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
-    Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
-  }
-}
-
-private[spark]
-case class GetLocations(blockId: String) extends ToBlockManagerMaster
-
-private[spark]
-case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-
-private[spark]
-case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-
-private[spark]
-case class RemoveHost(host: String) extends ToBlockManagerMaster
-
-private[spark]
-case object StopBlockManagerMaster extends ToBlockManagerMaster
-
-private[spark]
-case object GetMemoryStatus extends ToBlockManagerMaster
-
-private[spark]
-case object ExpireDeadHosts extends ToBlockManagerMaster
-
-
-private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
-
-  class BlockManagerInfo(
-      val blockManagerId: BlockManagerId,
-      timeMs: Long,
-      val maxMem: Long) {
-    private var _lastSeenMs = timeMs
-    private var _remainingMem = maxMem
-    private val _blocks = new JHashMap[String, StorageLevel]
-
-    logInfo("Registering block manager %s:%d with %s RAM".format(
-      blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
-
-    def updateLastSeenMs() {
-      _lastSeenMs = System.currentTimeMillis()
-    }
-
-    def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
-      : Unit = synchronized {
-
-      updateLastSeenMs()
-
-      if (_blocks.containsKey(blockId)) {
-        // The block exists on the slave already.
-        val originalLevel: StorageLevel = _blocks.get(blockId)
-
-        if (originalLevel.useMemory) {
-          _remainingMem += memSize
-        }
-      }
-
-      if (storageLevel.isValid) {
-        // isValid means it is either stored in-memory or on-disk.
-        _blocks.put(blockId, storageLevel)
-        if (storageLevel.useMemory) {
-          _remainingMem -= memSize
-          logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
-            blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
-            Utils.memoryBytesToString(_remainingMem)))
-        }
-        if (storageLevel.useDisk) {
-          logInfo("Added %s on disk on %s:%d (size: %s)".format(
-            blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
-        }
-      } else if (_blocks.containsKey(blockId)) {
-        // If isValid is not true, drop the block.
-        val originalLevel: StorageLevel = _blocks.get(blockId)
-        _blocks.remove(blockId)
-        if (originalLevel.useMemory) {
-          _remainingMem += memSize
-          logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
-            blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
-            Utils.memoryBytesToString(_remainingMem)))
-        }
-        if (originalLevel.useDisk) {
-          logInfo("Removed %s on %s:%d on disk (size: %s)".format(
-            blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
-        }
-      }
-    }
-
-    def remainingMem: Long = _remainingMem
-
-    def lastSeenMs: Long = _lastSeenMs
-
-    def blocks: JHashMap[String, StorageLevel] = _blocks
-
-    override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
-
-    def clear() {
-      _blocks.clear()
-    }
-  }
-
-  private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
-  private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
-  private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
-
-  initLogging()
-
-  val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
-    "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
-
-  val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
-    "5000").toLong
-
-  var timeoutCheckingTask: Cancellable = null
+private[spark] class BlockManagerMaster(
+    val actorSystem: ActorSystem,
+    isMaster: Boolean,
+    isLocal: Boolean,
+    masterIp: String,
+    masterPort: Int)
+  extends Logging {
 
-  override def preStart() {
-    if (!BlockManager.getDisableHeartBeatsForTesting) {
-      timeoutCheckingTask = context.system.scheduler.schedule(
-        0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
-    }
-    super.preStart()
-  }
+  val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
+  val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
 
-  def removeBlockManager(blockManagerId: BlockManagerId) {
-    val info = blockManagerInfo(blockManagerId)
-    blockManagerIdByHost.remove(blockManagerId.ip)
-    blockManagerInfo.remove(blockManagerId)
-    var iterator = info.blocks.keySet.iterator
-    while (iterator.hasNext) {
-      val blockId = iterator.next
-      val locations = blockInfo.get(blockId)._2
-      locations -= blockManagerId
-      if (locations.size == 0) {
-        blockInfo.remove(locations)
-      }
-    }
-  }
-
-  def expireDeadHosts() {
-    logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
-    val now = System.currentTimeMillis()
-    val minSeenTime = now - slaveTimeout
-    val toRemove = new HashSet[BlockManagerId]
-    for (info <- blockManagerInfo.values) {
-      if (info.lastSeenMs < minSeenTime) {
-        logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
-        toRemove += info.blockManagerId
-      }
-    }
-    // TODO: Remove corresponding block infos
-    toRemove.foreach(removeBlockManager)
-  }
-
-  def removeHost(host: String) {
-    logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
-    logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
-    blockManagerIdByHost.get(host).foreach(removeBlockManager)
-    logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
-    sender ! true
-  }
+  val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
+  val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
+  val DEFAULT_MANAGER_IP: String = Utils.localHostName()
 
-  def heartBeat(blockManagerId: BlockManagerId) {
-    if (!blockManagerInfo.contains(blockManagerId)) {
-      if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
-        sender ! true
-      } else {
-        sender ! false
-      }
+  val timeout = 10.seconds
+  var masterActor: ActorRef = {
+    if (isMaster) {
+      val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
+        name = MASTER_AKKA_ACTOR_NAME)
+      logInfo("Registered BlockManagerMaster Actor")
+      masterActor
     } else {
-      blockManagerInfo(blockManagerId).updateLastSeenMs()
-      sender ! true
+      val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
+      logInfo("Connecting to BlockManagerMaster: " + url)
+      actorSystem.actorFor(url)
     }
   }
 
-  def receive = {
-    case RegisterBlockManager(blockManagerId, maxMemSize) =>
-      register(blockManagerId, maxMemSize)
-
-    case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
-      blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
-
-    case GetLocations(blockId) =>
-      getLocations(blockId)
-
-    case GetLocationsMultipleBlockIds(blockIds) =>
-      getLocationsMultipleBlockIds(blockIds)
-
-    case GetPeers(blockManagerId, size) =>
-      getPeersDeterministic(blockManagerId, size)
-      /*getPeers(blockManagerId, size)*/
-
-    case GetMemoryStatus =>
-      getMemoryStatus
-
-    case RemoveHost(host) =>
-      removeHost(host)
-      sender ! true
-
-    case StopBlockManagerMaster =>
-      logInfo("Stopping BlockManagerMaster")
-      sender ! true
-      if (timeoutCheckingTask != null) {
-        timeoutCheckingTask.cancel
-      }
-      context.stop(self)
-
-    case ExpireDeadHosts =>
-      expireDeadHosts()
-
-    case HeartBeat(blockManagerId) =>
-      heartBeat(blockManagerId)
-
-    case other =>
-      logInfo("Got unknown message: " + other)
+  /** Remove a dead host from the master actor. This is only called on the master side. */
+  def notifyADeadHost(host: String) {
+    tell(RemoveHost(host))
+    logInfo("Removed " + host + " successfully in notifyADeadHost")
   }
 
-  // Return a map from the block manager id to max memory and remaining memory.
-  private def getMemoryStatus() {
-    val res = blockManagerInfo.map { case(blockManagerId, info) =>
-      (blockManagerId, (info.maxMem, info.remainingMem))
-    }.toMap
-    sender ! res
+  /**
+   * Send the master actor a heart beat from the slave. Returns true if everything works out,
+   * false if the master does not know about the given block manager, which means the block
+   * manager should re-register.
+   */
+  def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
+    askMasterWithRetry[Boolean](HeartBeat(blockManagerId))
   }
 
-  private def register(blockManagerId: BlockManagerId, maxMemSize: Long) {
-    val startTimeMs = System.currentTimeMillis()
-    val tmp = " " + blockManagerId + " "
-    logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
-    if (blockManagerIdByHost.contains(blockManagerId.ip) &&
-        blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
-      val oldId = blockManagerIdByHost(blockManagerId.ip)
-      logInfo("Got second registration for host " + blockManagerId +
-              "; removing old slave " + oldId)
-      removeBlockManager(oldId)
-    }
-    if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
-      logInfo("Got Register Msg from master node, don't register it")
-    } else {
-      blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
-        blockManagerId, System.currentTimeMillis(), maxMemSize))
-    }
-    blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
-    logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
-    sender ! true
+  /** Register the BlockManager's id with the master. */
+  def registerBlockManager(
+    blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+    logInfo("Trying to register BlockManager")
+    tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
+    logInfo("Registered BlockManager")
   }
 
-  private def blockUpdate(
+  def updateBlockInfo(
       blockManagerId: BlockManagerId,
       blockId: String,
       storageLevel: StorageLevel,
       memSize: Long,
-      diskSize: Long) {
-
-    val startTimeMs = System.currentTimeMillis()
-    val tmp = " " + blockManagerId + " " + blockId + " "
-
-    if (!blockManagerInfo.contains(blockManagerId)) {
-      if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
-        // We intentionally do not register the master (except in local mode),
-        // so we should not indicate failure.
-        sender ! true
-      } else {
-        sender ! false
-      }
-      return
-    }
-
-    if (blockId == null) {
-      blockManagerInfo(blockManagerId).updateLastSeenMs()
-      logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
-      sender ! true
-      return
-    }
-
-    blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
-
-    var locations: HashSet[BlockManagerId] = null
-    if (blockInfo.containsKey(blockId)) {
-      locations = blockInfo.get(blockId)._2
-    } else {
-      locations = new HashSet[BlockManagerId]
-      blockInfo.put(blockId, (storageLevel.replication, locations))
-    }
-
-    if (storageLevel.isValid) {
-      locations += blockManagerId
-    } else {
-      locations.remove(blockManagerId)
-    }
-
-    if (locations.size == 0) {
-      blockInfo.remove(blockId)
-    }
-    sender ! true
+      diskSize: Long): Boolean = {
+    val res = askMasterWithRetry[Boolean](
+      UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
+    logInfo("Updated info of block " + blockId)
+    res
   }
 
-  private def getLocations(blockId: String) {
-    val startTimeMs = System.currentTimeMillis()
-    val tmp = " " + blockId + " "
-    logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
-    if (blockInfo.containsKey(blockId)) {
-      var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-      res.appendAll(blockInfo.get(blockId)._2)
-      logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
-          + Utils.getUsedTimeMs(startTimeMs))
-      sender ! res.toSeq
-    } else {
-      logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs))
-      var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-      sender ! res
-    }
+  /** Get locations of the blockId from the master */
+  def getLocations(blockId: String): Seq[BlockManagerId] = {
+    askMasterWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
   }
 
-  private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
-    def getLocations(blockId: String): Seq[BlockManagerId] = {
-      val tmp = blockId
-      logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp)
-      if (blockInfo.containsKey(blockId)) {
-        var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-        res.appendAll(blockInfo.get(blockId)._2)
-        logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq)
-        return res.toSeq
-      } else {
-        logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp)
-        var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-        return res.toSeq
-      }
-    }
-
-    logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
-    var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
-    for (blockId <- blockIds) {
-      res.append(getLocations(blockId))
-    }
-    logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq)
-    sender ! res.toSeq
+  /** Get locations of multiple blockIds from the master */
+  def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
+    askMasterWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int) {
-    var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-    var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-    res.appendAll(peers)
-    res -= blockManagerId
-    val rand = new Random(System.currentTimeMillis())
-    while (res.length > size) {
-      res.remove(rand.nextInt(res.length))
+  /** Get ids of other nodes in the cluster from the master */
+  def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
+    val result = askMasterWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
+    if (result.length != numPeers) {
+      throw new SparkException(
+        "Error getting peers, only got " + result.size + " instead of " + numPeers)
     }
-    sender ! res.toSeq
+    result
   }
 
-  private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
-    var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-    var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-
-    val peersWithIndices = peers.zipWithIndex
-    val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1)
-    if (selfIndex == -1) {
-      throw new Exception("Self index for " + blockManagerId + " not found")
-    }
-
-    var index = selfIndex
-    while (res.size < size) {
-      index += 1
-      if (index == selfIndex) {
-        throw new Exception("More peer expected than available")
-      }
-      res += peers(index % peers.size)
-    }
-    sender ! res.toSeq
+  /**
+   * Remove a block from the slaves that have it. This can only be used to remove
+   * blocks that the master knows about.
+   */
+  def removeBlock(blockId: String) {
+    askMasterWithRetry(RemoveBlock(blockId))
   }
-}
-
-private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean)
-  extends Logging {
-
-  val AKKA_ACTOR_NAME: String = "BlockMasterManager"
-  val REQUEST_RETRY_INTERVAL_MS = 100
-  val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost")
-  val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt
-  val DEFAULT_MANAGER_IP: String = Utils.localHostName()
 
-  val timeout = 10.seconds
-  var masterActor: ActorRef = null
-
-  if (isMaster) {
-    masterActor = actorSystem.actorOf(
-      Props(new BlockManagerMasterActor(isLocal)), name = AKKA_ACTOR_NAME)
-    logInfo("Registered BlockManagerMaster Actor")
-  } else {
-    val url = "akka://spark@%s:%s/user/%s".format(
-      DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME)
-    logInfo("Connecting to BlockManagerMaster: " + url)
-    masterActor = actorSystem.actorFor(url)
+  /**
+   * Return the memory status for each block manager, in the form of a map from
+   * the block manager's id to two long values. The first value is the maximum
+   * amount of memory allocated for the block manager, while the second is the
+   * amount of remaining memory.
+   */
+  def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
+    askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
   }
 
+  /** Stop the master actor, called only on the Spark master node */
   def stop() {
     if (masterActor != null) {
-      communicate(StopBlockManagerMaster)
+      tell(StopBlockManagerMaster)
       masterActor = null
       logInfo("BlockManagerMaster stopped")
     }
   }
 
-  // Send a message to the master actor and get its result within a default timeout, or
-  // throw a SparkException if this fails.
-  def askMaster(message: Any): Any = {
-    try {
-      val future = masterActor.ask(message)(timeout)
-      return Await.result(future, timeout)
-    } catch {
-      case e: Exception =>
-        throw new SparkException("Error communicating with BlockManagerMaster", e)
-    }
-  }
-
-  // Send a one-way message to the master actor, to which we expect it to reply with true.
-  def communicate(message: Any) {
-    if (askMaster(message) != true) {
-      throw new SparkException("Error reply received from BlockManagerMaster")
-    }
-  }
-
-  def notifyADeadHost(host: String) {
-    communicate(RemoveHost(host))
-    logInfo("Removed " + host + " successfully in notifyADeadHost")
-  }
-
-  def mustRegisterBlockManager(msg: RegisterBlockManager) {
-    logInfo("Trying to register BlockManager")
-    while (! syncRegisterBlockManager(msg)) {
-      logWarning("Failed to register " + msg)
-      Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
-    }
-    logInfo("Done registering BlockManager")
-  }
-
-  def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = {
-    //val masterActor = RemoteActor.select(node, name)
-    val startTimeMs = System.currentTimeMillis()
-    val tmp = " msg " + msg + " "
-    logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
-    try {
-      communicate(msg)
-      logInfo("BlockManager registered successfully @ syncRegisterBlockManager")
-      logDebug("Got in syncRegisterBlockManager 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-      return true
-    } catch {
-      case e: Exception =>
-        logError("Failed in syncRegisterBlockManager", e)
-        return false
-    }
-  }
-
-  def mustHeartBeat(msg: HeartBeat): Boolean = {
-    var res = syncHeartBeat(msg)
-    while (!res.isDefined) {
-      logWarning("Failed to send heart beat " + msg)
-      Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
+  /** Send a one-way message to the master actor, to which we expect it to reply with true. */
+  private def tell(message: Any) {
+    if (!askMasterWithRetry[Boolean](message)) {
+      throw new SparkException("BlockManagerMasterActor returned false, expected true.")
     }
-    return res.get
   }
 
-  def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
-    try {
-      val answer = askMaster(msg).asInstanceOf[Boolean]
-      return Some(answer)
-    } catch {
-      case e: Exception =>
-        logError("Failed in syncHeartBeat", e)
-        return None
+  /**
+   * Send a message to the master actor and get its result within a default timeout, or
+   * throw a SparkException if this fails.
+   */
+  private def askMasterWithRetry[T](message: Any): T = {
+    // TODO: Consider removing multiple attempts
+    if (masterActor == null) {
+      throw new SparkException("Error sending message to BlockManager as masterActor is null " +
+        "[message = " + message + "]")
     }
-  }
-
-  def mustBlockUpdate(msg: BlockUpdate): Boolean = {
-    var res = syncBlockUpdate(msg)
-    while (!res.isDefined) {
-      logWarning("Failed to send block update " + msg)
-      Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
-    }
-    return res.get
-  }
-
-  def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
-    val startTimeMs = System.currentTimeMillis()
-    val tmp = " msg " + msg + " "
-    logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
-
-    try {
-      val answer = askMaster(msg).asInstanceOf[Boolean]
-      logDebug("Block update sent successfully")
-      logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
-      return Some(answer)
-    } catch {
-      case e: Exception =>
-        logError("Failed in syncBlockUpdate", e)
-        return None
-    }
-  }
-
-  def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
-    var res = syncGetLocations(msg)
-    while (res == null) {
-      logInfo("Failed to get locations " + msg)
-      Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
-      res = syncGetLocations(msg)
-    }
-    return res
-  }
-
-  def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
-    val startTimeMs = System.currentTimeMillis()
-    val tmp = " msg " + msg + " "
-    logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
-    try {
-      val answer = askMaster(msg).asInstanceOf[ArrayBuffer[BlockManagerId]]
-      if (answer != null) {
-        logDebug("GetLocations successful")
-        logDebug("Got in syncGetLocations 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-        return answer
-      } else {
-        logError("Master replied null in response to GetLocations")
-        return null
+    var attempts = 0
+    var lastException: Exception = null
+    while (attempts < AKKA_RETRY_ATTEMPS) {
+      attempts += 1
+      try {
+        val future = masterActor.ask(message)(timeout)
+        val result = Await.result(future, timeout)
+        if (result == null) {
+          throw new Exception("BlockManagerMaster returned null")
+        }
+        return result.asInstanceOf[T]
+      } catch {
+        case ie: InterruptedException => throw ie
+        case e: Exception =>
+          lastException = e
+          logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
       }
-    } catch {
-      case e: Exception =>
-        logError("GetLocations failed", e)
-        return null
+      Thread.sleep(AKKA_RETRY_INTERVAL_MS)
     }
-  }
 
-  def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
-       Seq[Seq[BlockManagerId]] = {
-    var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg)
-    while (res == null) {
-      logWarning("Failed to GetLocationsMultipleBlockIds " + msg)
-      Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
-      res = syncGetLocationsMultipleBlockIds(msg)
-    }
-    return res
+    throw new SparkException(
+      "Error sending message to BlockManagerMaster [message = " + message + "]", lastException)
   }
 
-  def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
-      Seq[Seq[BlockManagerId]] = {
-    val startTimeMs = System.currentTimeMillis
-    val tmp = " msg " + msg + " "
-    logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
-    try {
-      val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]]
-      if (answer != null) {
-        logDebug("GetLocationsMultipleBlockIds successful")
-        logDebug("Got in syncGetLocationsMultipleBlockIds 1 " + tmp +
-          Utils.getUsedTimeMs(startTimeMs))
-        return answer
-      } else {
-        logError("Master replied null in response to GetLocationsMultipleBlockIds")
-        return null
-      }
-    } catch {
-      case e: Exception =>
-        logError("GetLocationsMultipleBlockIds failed", e)
-        return null
-    }
-  }
-
-  def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
-    var res = syncGetPeers(msg)
-    while ((res == null) || (res.length != msg.size)) {
-      logInfo("Failed to get peers " + msg)
-      Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
-      res = syncGetPeers(msg)
-    }
-
-    return res
-  }
-
-  def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
-    val startTimeMs = System.currentTimeMillis
-    val tmp = " msg " + msg + " "
-    logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
-    try {
-      val answer = askMaster(msg).asInstanceOf[Seq[BlockManagerId]]
-      if (answer != null) {
-        logDebug("GetPeers successful")
-        logDebug("Got in syncGetPeers 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-        return answer
-      } else {
-        logError("Master replied null in response to GetPeers")
-        return null
-      }
-    } catch {
-      case e: Exception =>
-        logError("GetPeers failed", e)
-        return null
-    }
-  }
-
-  def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
-    askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]]
-  }
 }
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f4d026da3329c801775275899a7f2dd94136f6dc
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -0,0 +1,401 @@
+package spark.storage
+
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+import akka.actor.{Actor, ActorRef, Cancellable}
+import akka.util.{Duration, Timeout}
+import akka.util.duration._
+
+import spark.{Logging, Utils}
+
+/**
+ * BlockManagerMasterActor is an actor on the master node to track statuses of
+ * all slaves' block managers.
+ */
+private[spark]
+class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
+
+  // Mapping from block manager id to the block manager's information.
+  private val blockManagerInfo =
+    new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
+
+  // Mapping from host name to block manager id. We allow multiple block managers
+  // on the same host name (ip).
+  private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]]
+
+  // Mapping from block id to the set of block managers that have the block.
+  private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
+
+  initLogging()
+
+  val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
+    "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
+
+  val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
+    "5000").toLong
+
+  var timeoutCheckingTask: Cancellable = null
+
+  override def preStart() {
+    if (!BlockManager.getDisableHeartBeatsForTesting) {
+      timeoutCheckingTask = context.system.scheduler.schedule(
+        0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+    }
+    super.preStart()
+  }
+
+  def receive = {
+    case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
+      register(blockManagerId, maxMemSize, slaveActor)
+
+    case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+      updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
+
+    case GetLocations(blockId) =>
+      getLocations(blockId)
+
+    case GetLocationsMultipleBlockIds(blockIds) =>
+      getLocationsMultipleBlockIds(blockIds)
+
+    case GetPeers(blockManagerId, size) =>
+      getPeersDeterministic(blockManagerId, size)
+      /*getPeers(blockManagerId, size)*/
+
+    case GetMemoryStatus =>
+      getMemoryStatus
+
+    case RemoveBlock(blockId) =>
+      removeBlock(blockId)
+
+    case RemoveHost(host) =>
+      removeHost(host)
+      sender ! true
+
+    case StopBlockManagerMaster =>
+      logInfo("Stopping BlockManagerMaster")
+      sender ! true
+      if (timeoutCheckingTask != null) {
+        timeoutCheckingTask.cancel
+      }
+      context.stop(self)
+
+    case ExpireDeadHosts =>
+      expireDeadHosts()
+
+    case HeartBeat(blockManagerId) =>
+      heartBeat(blockManagerId)
+
+    case other =>
+      logInfo("Got unknown message: " + other)
+  }
+
+  def removeBlockManager(blockManagerId: BlockManagerId) {
+    val info = blockManagerInfo(blockManagerId)
+
+    // Remove the block manager from blockManagerIdByHost. If the list of block
+    // managers belonging to the IP is empty, remove the entry from the hash map.
+    blockManagerIdByHost.get(blockManagerId.ip).foreach { managers: ArrayBuffer[BlockManagerId] =>
+      managers -= blockManagerId
+      if (managers.size == 0) blockManagerIdByHost.remove(blockManagerId.ip)
+    }
+
+    // Remove it from blockManagerInfo and remove all the blocks.
+    blockManagerInfo.remove(blockManagerId)
+    var iterator = info.blocks.keySet.iterator
+    while (iterator.hasNext) {
+      val blockId = iterator.next
+      val locations = blockLocations.get(blockId)._2
+      locations -= blockManagerId
+      if (locations.size == 0) {
+        blockLocations.remove(locations)
+      }
+    }
+  }
+
+  def expireDeadHosts() {
+    logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
+    val now = System.currentTimeMillis()
+    val minSeenTime = now - slaveTimeout
+    val toRemove = new HashSet[BlockManagerId]
+    for (info <- blockManagerInfo.values) {
+      if (info.lastSeenMs < minSeenTime) {
+        logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
+        toRemove += info.blockManagerId
+      }
+    }
+    toRemove.foreach(removeBlockManager)
+  }
+
+  def removeHost(host: String) {
+    logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
+    logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
+    blockManagerIdByHost.get(host).foreach(_.foreach(removeBlockManager))
+    logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
+    sender ! true
+  }
+
+  def heartBeat(blockManagerId: BlockManagerId) {
+    if (!blockManagerInfo.contains(blockManagerId)) {
+      if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+        sender ! true
+      } else {
+        sender ! false
+      }
+    } else {
+      blockManagerInfo(blockManagerId).updateLastSeenMs()
+      sender ! true
+    }
+  }
+
+  // Remove a block from the slaves that have it. This can only be used to remove
+  // blocks that the master knows about.
+  private def removeBlock(blockId: String) {
+    val block = blockLocations.get(blockId)
+    if (block != null) {
+      block._2.foreach { blockManagerId: BlockManagerId =>
+        val blockManager = blockManagerInfo.get(blockManagerId)
+        if (blockManager.isDefined) {
+          // Remove the block from the slave's BlockManager.
+          // Doesn't actually wait for a confirmation and the message might get lost.
+          // If message loss becomes frequent, we should add retry logic here.
+          blockManager.get.slaveActor ! RemoveBlock(blockId)
+        }
+      }
+    }
+    sender ! true
+  }
+
+  // Return a map from the block manager id to max memory and remaining memory.
+  private def getMemoryStatus() {
+    val res = blockManagerInfo.map { case(blockManagerId, info) =>
+      (blockManagerId, (info.maxMem, info.remainingMem))
+    }.toMap
+    sender ! res
+  }
+
+  private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+    val startTimeMs = System.currentTimeMillis()
+    val tmp = " " + blockManagerId + " "
+
+    if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+      logInfo("Got Register Msg from master node, don't register it")
+    } else {
+      blockManagerIdByHost.get(blockManagerId.ip) match {
+        case Some(managers) =>
+          // A block manager of the same host name already exists.
+          logInfo("Got another registration for host " + blockManagerId)
+          managers += blockManagerId
+        case None =>
+          blockManagerIdByHost += (blockManagerId.ip -> ArrayBuffer(blockManagerId))
+      }
+
+      blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
+        blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
+    }
+    sender ! true
+  }
+
+  private def updateBlockInfo(
+      blockManagerId: BlockManagerId,
+      blockId: String,
+      storageLevel: StorageLevel,
+      memSize: Long,
+      diskSize: Long) {
+
+    val startTimeMs = System.currentTimeMillis()
+    val tmp = " " + blockManagerId + " " + blockId + " "
+
+    if (!blockManagerInfo.contains(blockManagerId)) {
+      if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+        // We intentionally do not register the master (except in local mode),
+        // so we should not indicate failure.
+        sender ! true
+      } else {
+        sender ! false
+      }
+      return
+    }
+
+    if (blockId == null) {
+      blockManagerInfo(blockManagerId).updateLastSeenMs()
+      sender ! true
+      return
+    }
+
+    blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
+
+    var locations: HashSet[BlockManagerId] = null
+    if (blockLocations.containsKey(blockId)) {
+      locations = blockLocations.get(blockId)._2
+    } else {
+      locations = new HashSet[BlockManagerId]
+      blockLocations.put(blockId, (storageLevel.replication, locations))
+    }
+
+    if (storageLevel.isValid) {
+      locations.add(blockManagerId)
+    } else {
+      locations.remove(blockManagerId)
+    }
+
+    // Remove the block from master tracking if it has been removed on all slaves.
+    if (locations.size == 0) {
+      blockLocations.remove(blockId)
+    }
+    sender ! true
+  }
+
+  private def getLocations(blockId: String) {
+    val startTimeMs = System.currentTimeMillis()
+    val tmp = " " + blockId + " "
+    if (blockLocations.containsKey(blockId)) {
+      var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+      res.appendAll(blockLocations.get(blockId)._2)
+      sender ! res.toSeq
+    } else {
+      var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+      sender ! res
+    }
+  }
+
+  private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
+    def getLocations(blockId: String): Seq[BlockManagerId] = {
+      val tmp = blockId
+      if (blockLocations.containsKey(blockId)) {
+        var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+        res.appendAll(blockLocations.get(blockId)._2)
+        return res.toSeq
+      } else {
+        var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+        return res.toSeq
+      }
+    }
+
+    var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
+    for (blockId <- blockIds) {
+      res.append(getLocations(blockId))
+    }
+    sender ! res.toSeq
+  }
+
+  private def getPeers(blockManagerId: BlockManagerId, size: Int) {
+    var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
+    var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+    res.appendAll(peers)
+    res -= blockManagerId
+    val rand = new Random(System.currentTimeMillis())
+    while (res.length > size) {
+      res.remove(rand.nextInt(res.length))
+    }
+    sender ! res.toSeq
+  }
+
+  private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
+    var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
+    var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+
+    val selfIndex = peers.indexOf(blockManagerId)
+    if (selfIndex == -1) {
+      throw new Exception("Self index for " + blockManagerId + " not found")
+    }
+
+    // Note that this logic will select the same node multiple times if there aren't enough peers
+    var index = selfIndex
+    while (res.size < size) {
+      index += 1
+      if (index == selfIndex) {
+        throw new Exception("More peer expected than available")
+      }
+      res += peers(index % peers.size)
+    }
+    sender ! res.toSeq
+  }
+}
+
+
+private[spark]
+object BlockManagerMasterActor {
+
+  case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+
+  class BlockManagerInfo(
+      val blockManagerId: BlockManagerId,
+      timeMs: Long,
+      val maxMem: Long,
+      val slaveActor: ActorRef)
+    extends Logging {
+
+    private var _lastSeenMs: Long = timeMs
+    private var _remainingMem: Long = maxMem
+
+    // Mapping from block id to its status.
+    private val _blocks = new JHashMap[String, BlockStatus]
+
+    logInfo("Registering block manager %s:%d with %s RAM".format(
+      blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
+
+    def updateLastSeenMs() {
+      _lastSeenMs = System.currentTimeMillis()
+    }
+
+    def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+      : Unit = synchronized {
+
+      updateLastSeenMs()
+
+      if (_blocks.containsKey(blockId)) {
+        // The block exists on the slave already.
+        val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
+
+        if (originalLevel.useMemory) {
+          _remainingMem += memSize
+        }
+      }
+
+      if (storageLevel.isValid) {
+        // isValid means it is either stored in-memory or on-disk.
+        _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
+        if (storageLevel.useMemory) {
+          _remainingMem -= memSize
+          logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
+            blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+            Utils.memoryBytesToString(_remainingMem)))
+        }
+        if (storageLevel.useDisk) {
+          logInfo("Added %s on disk on %s:%d (size: %s)".format(
+            blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
+        }
+      } else if (_blocks.containsKey(blockId)) {
+        // If isValid is not true, drop the block.
+        val blockStatus: BlockStatus = _blocks.get(blockId)
+        _blocks.remove(blockId)
+        if (blockStatus.storageLevel.useMemory) {
+          _remainingMem += blockStatus.memSize
+          logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
+            blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+            Utils.memoryBytesToString(_remainingMem)))
+        }
+        if (blockStatus.storageLevel.useDisk) {
+          logInfo("Removed %s on %s:%d on disk (size: %s)".format(
+            blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
+        }
+      }
+    }
+
+    def remainingMem: Long = _remainingMem
+
+    def lastSeenMs: Long = _lastSeenMs
+
+    def blocks: JHashMap[String, BlockStatus] = _blocks
+
+    override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
+
+    def clear() {
+      _blocks.clear()
+    }
+  }
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
new file mode 100644
index 0000000000000000000000000000000000000000..d73a9b790f4793596ed9fce418af83fc62a7edf5
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -0,0 +1,102 @@
+package spark.storage
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import akka.actor.ActorRef
+
+
+//////////////////////////////////////////////////////////////////////////////////
+// Messages from the master to slaves.
+//////////////////////////////////////////////////////////////////////////////////
+private[spark]
+sealed trait ToBlockManagerSlave
+
+// Remove a block from the slaves that have it. This can only be used to remove
+// blocks that the master knows about.
+private[spark]
+case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
+
+
+//////////////////////////////////////////////////////////////////////////////////
+// Messages from slaves to the master.
+//////////////////////////////////////////////////////////////////////////////////
+private[spark]
+sealed trait ToBlockManagerMaster
+
+private[spark]
+case class RegisterBlockManager(
+    blockManagerId: BlockManagerId,
+    maxMemSize: Long,
+    sender: ActorRef)
+  extends ToBlockManagerMaster
+
+private[spark]
+case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+private[spark]
+class UpdateBlockInfo(
+    var blockManagerId: BlockManagerId,
+    var blockId: String,
+    var storageLevel: StorageLevel,
+    var memSize: Long,
+    var diskSize: Long)
+  extends ToBlockManagerMaster
+  with Externalizable {
+
+  def this() = this(null, null, null, 0, 0)  // For deserialization only
+
+  override def writeExternal(out: ObjectOutput) {
+    blockManagerId.writeExternal(out)
+    out.writeUTF(blockId)
+    storageLevel.writeExternal(out)
+    out.writeInt(memSize.toInt)
+    out.writeInt(diskSize.toInt)
+  }
+
+  override def readExternal(in: ObjectInput) {
+    blockManagerId = new BlockManagerId()
+    blockManagerId.readExternal(in)
+    blockId = in.readUTF()
+    storageLevel = new StorageLevel()
+    storageLevel.readExternal(in)
+    memSize = in.readInt()
+    diskSize = in.readInt()
+  }
+}
+
+private[spark]
+object UpdateBlockInfo {
+  def apply(blockManagerId: BlockManagerId,
+      blockId: String,
+      storageLevel: StorageLevel,
+      memSize: Long,
+      diskSize: Long): UpdateBlockInfo = {
+    new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
+  }
+
+  // For pattern-matching
+  def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+    Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
+  }
+}
+
+private[spark]
+case class GetLocations(blockId: String) extends ToBlockManagerMaster
+
+private[spark]
+case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
+
+private[spark]
+case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+
+private[spark]
+case class RemoveHost(host: String) extends ToBlockManagerMaster
+
+private[spark]
+case object StopBlockManagerMaster extends ToBlockManagerMaster
+
+private[spark]
+case object GetMemoryStatus extends ToBlockManagerMaster
+
+private[spark]
+case object ExpireDeadHosts extends ToBlockManagerMaster
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f570cdc52dd1b2347b8cca8eab62af80b318815e
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -0,0 +1,16 @@
+package spark.storage
+
+import akka.actor.Actor
+
+import spark.{Logging, SparkException, Utils}
+
+
+/**
+ * An actor to take commands from the master to execute options. For example,
+ * this is used to remove blocks from the slave's BlockManager.
+ */
+class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
+  override def receive = {
+    case RemoveBlock(blockId) => blockManager.removeBlock(blockId)
+  }
+}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 096bf8bdd967c05f5a9e55de844a2084b8a681be..8188d3595eda6762b965beb0711713093038fb5c 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -31,7 +31,12 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
 
   def getValues(blockId: String): Option[Iterator[Any]]
 
-  def remove(blockId: String)
+  /**
+   * Remove a block, if it exists.
+   * @param blockId the block to remove.
+   * @return True if the block was found and removed, False otherwise.
+   */
+  def remove(blockId: String): Boolean
 
   def contains(blockId: String): Boolean
 
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index b5561479dbf50ff225511d31df387fe48bc9a296..7e5b820cbbdc6ca145c2eb7c6787bd2c137c80d0 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -92,10 +92,13 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
     getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes))
   }
 
-  override def remove(blockId: String) {
+  override def remove(blockId: String): Boolean = {
     val file = getFile(blockId)
     if (file.exists()) {
       file.delete()
+      true
+    } else {
+      false
     }
   }
 
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 02098b82fe77380eeee685f6c7b5b23b6df4a51f..00e32f753c1604507f7fc2b05eae0066729defa3 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -90,7 +90,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     }
   }
 
-  override def remove(blockId: String) {
+  override def remove(blockId: String): Boolean = {
     entries.synchronized {
       val entry = entries.get(blockId)
       if (entry != null) {
@@ -98,8 +98,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         currentMemory -= entry.size
         logInfo("Block %s of size %d dropped from memory (free %d)".format(
           blockId, entry.size, freeMemory))
+        true
       } else {
-        logWarning("Block " + blockId + " could not be removed as it does not exist")
+        false
       }
     }
   }
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index c497f03e0c3e7df9fb6304e9afb7c742c0050570..e3544e5aae28ca5d41ba524ed980e5af98c4f3d7 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -1,6 +1,6 @@
 package spark.storage
 
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
 
 /**
  * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
@@ -10,14 +10,16 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
  * commonly useful storage levels.
  */
 class StorageLevel(
-    var useDisk: Boolean, 
+    var useDisk: Boolean,
     var useMemory: Boolean,
     var deserialized: Boolean,
     var replication: Int = 1)
   extends Externalizable {
 
   // TODO: Also add fields for caching priority, dataset ID, and flushing.
-  
+
+  assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
+
   def this(flags: Int, replication: Int) {
     this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
   }
@@ -29,14 +31,14 @@ class StorageLevel(
 
   override def equals(other: Any): Boolean = other match {
     case s: StorageLevel =>
-      s.useDisk == useDisk && 
+      s.useDisk == useDisk &&
       s.useMemory == useMemory &&
       s.deserialized == deserialized &&
-      s.replication == replication 
+      s.replication == replication
     case _ =>
       false
   }
- 
+
   def isValid = ((useMemory || useDisk) && (replication > 0))
 
   def toInt: Int = {
@@ -66,10 +68,16 @@ class StorageLevel(
     replication = in.readByte()
   }
 
+  @throws(classOf[IOException])
+  private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
+
   override def toString: String =
     "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
+
+  override def hashCode(): Int = toInt * 41 + replication
 }
 
+
 object StorageLevel {
   val NONE = new StorageLevel(false, false, false)
   val DISK_ONLY = new StorageLevel(true, false, false)
@@ -82,4 +90,16 @@ object StorageLevel {
   val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
   val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
   val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
+
+  private[spark]
+  val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]()
+
+  private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
+    if (storageLevelCache.containsKey(level)) {
+      storageLevelCache.get(level)
+    } else {
+      storageLevelCache.put(level, level)
+      level
+    }
+  }
 }
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index 5bb5a29cc42243a95c160f2eb62a65a8ba1e2792..689f07b9692fbf5c68b78080f85222d86fbe8582 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -58,8 +58,10 @@ private[spark] object ThreadingTest {
         val startTime = System.currentTimeMillis()
         manager.get(blockId) match {
           case Some(retrievedBlock) =>
-            assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
-            println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+            assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
+              "Block " + blockId + " did not match")
+            println("Got block " + blockId + " in " +
+              (System.currentTimeMillis - startTime) + " ms")
           case None =>
             assert(false, "Block " + blockId + " could not be retrieved")
         }
@@ -73,7 +75,9 @@ private[spark] object ThreadingTest {
     System.setProperty("spark.kryoserializer.buffer.mb", "1")
     val actorSystem = ActorSystem("test")
     val serializer = new KryoSerializer
-    val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
+    val masterIp: String = System.getProperty("spark.master.host", "localhost")
+    val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+    val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort)
     val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
     val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
     val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
@@ -86,6 +90,7 @@ private[spark] object ThreadingTest {
     actorSystem.shutdown()
     actorSystem.awaitTermination()
     println("Everything stopped.")
-    println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
+    println(
+      "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
   }
 }
diff --git a/core/src/main/scala/spark/util/IdGenerator.scala b/core/src/main/scala/spark/util/IdGenerator.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b6e309fe1ae68f7c06361dc12f0edbce5f5c3f9a
--- /dev/null
+++ b/core/src/main/scala/spark/util/IdGenerator.scala
@@ -0,0 +1,14 @@
+package spark.util
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * A util used to get a unique generation ID. This is a wrapper around Java's
+ * AtomicInteger. An example usage is in BlockManager, where each BlockManager
+ * instance would start an Akka actor and we use this utility to assign the Akka
+ * actors unique names.
+ */
+private[spark] class IdGenerator {
+  private var id = new AtomicInteger
+  def next: Int = id.incrementAndGet
+}
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
new file mode 100644
index 0000000000000000000000000000000000000000..19e67acd0c7423ad2175afe44571e575318a893a
--- /dev/null
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -0,0 +1,35 @@
+package spark.util
+
+import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
+import java.util.{TimerTask, Timer}
+import spark.Logging
+
+class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
+
+  val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt
+  val periodSeconds = math.max(10, delaySeconds / 10)
+  val timer = new Timer(name + " cleanup timer", true)
+
+  val task = new TimerTask {
+    def run() {
+      try {
+        if (delaySeconds > 0) {
+          cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
+          logInfo("Ran metadata cleaner for " + name)
+        }
+      } catch {
+        case e: Exception => logError("Error running cleanup task for " + name, e)
+      }
+    }
+  }
+  if (periodSeconds > 0) {
+    logInfo(
+      "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+      + "period of " + periodSeconds + " secs")
+    timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
+  }
+
+  def cancel() {
+    timer.cancel()
+  }
+}
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
new file mode 100644
index 0000000000000000000000000000000000000000..070ee19ac0753fd4140f70c8f6aa23cec102348c
--- /dev/null
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -0,0 +1,87 @@
+package spark.util
+
+import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, Map}
+
+/**
+ * This is a custom implementation of scala.collection.mutable.Map which stores the insertion
+ * time stamp along with each key-value pair. Key-value pairs that are older than a particular
+ * threshold time can them be removed using the cleanup method. This is intended to be a drop-in
+ * replacement of scala.collection.mutable.HashMap.
+ */
+class TimeStampedHashMap[A, B] extends Map[A, B]() {
+  val internalMap = new ConcurrentHashMap[A, (B, Long)]()
+
+  def get(key: A): Option[B] = {
+    val value = internalMap.get(key)
+    if (value != null) Some(value._1) else None
+  }
+
+  def iterator: Iterator[(A, B)] = {
+    val jIterator = internalMap.entrySet().iterator()
+    jIterator.map(kv => (kv.getKey, kv.getValue._1))
+  }
+
+  override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
+    val newMap = new TimeStampedHashMap[A, B1]
+    newMap.internalMap.putAll(this.internalMap)
+    newMap.internalMap.put(kv._1, (kv._2, currentTime))
+    newMap
+  }
+
+  override def - (key: A): Map[A, B] = {
+    internalMap.remove(key)
+    this
+  }
+
+  override def += (kv: (A, B)): this.type = {
+    internalMap.put(kv._1, (kv._2, currentTime))
+    this
+  }
+
+  override def -= (key: A): this.type = {
+    internalMap.remove(key)
+    this
+  }
+
+  override def update(key: A, value: B) {
+    this += ((key, value))
+  }
+
+  override def apply(key: A): B = {
+    val value = internalMap.get(key)
+    if (value == null) throw new NoSuchElementException()
+    value._1
+  }
+
+  override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
+    internalMap.map(kv => (kv._1, kv._2._1)).filter(p)
+  }
+
+  override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
+
+  override def size(): Int = internalMap.size()
+
+  override def foreach[U](f: ((A, B)) => U): Unit = {
+    val iterator = internalMap.entrySet().iterator()
+    while(iterator.hasNext) {
+      val entry = iterator.next()
+      val kv = (entry.getKey, entry.getValue._1)
+      f(kv)
+    }
+  }
+
+  def cleanup(threshTime: Long) {
+    val iterator = internalMap.entrySet().iterator()
+    while(iterator.hasNext) {
+      val entry = iterator.next()
+      if (entry.getValue._2 < threshTime) {
+        iterator.remove()
+      }
+    }
+  }
+
+  private def currentTime: Long = System.currentTimeMillis()
+
+}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index ad2253596df86b4330be66814e732a4332b30734..8f86e3170ed0f1b0af2874621ac5221a822dbcea 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -7,6 +7,10 @@ import akka.actor._
 import org.scalatest.FunSuite
 import org.scalatest.BeforeAndAfter
 import org.scalatest.PrivateMethodTester
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.matchers.ShouldMatchers._
+import org.scalatest.time.SpanSugar._
 
 import spark.KryoSerializer
 import spark.SizeEstimator
@@ -20,15 +24,16 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   var oldArch: String = null
   var oldOops: String = null
   var oldHeartBeat: String = null
-  
-  // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test 
+
+  // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+  System.setProperty("spark.kryoserializer.buffer.mb", "1")
   val serializer = new KryoSerializer
 
   before {
     actorSystem = ActorSystem("test")
-    master = new BlockManagerMaster(actorSystem, true, true)
+    master = new BlockManagerMaster(actorSystem, true, true, "localhost", 7077)
 
-    // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case 
+    // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
     oldArch = System.setProperty("os.arch", "amd64")
     oldOops = System.setProperty("spark.test.useCompressedOops", "true")
     oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
@@ -63,7 +68,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     }
   }
 
-  test("manager-master interaction") {
+  test("StorageLevel object caching") {
+    val level1 = new StorageLevel(false, false, false, 3)
+    val level2 = new StorageLevel(false, false, false, 3)
+    val bytes1 = spark.Utils.serialize(level1)
+    val level1_ = spark.Utils.deserialize[StorageLevel](bytes1)
+    val bytes2 = spark.Utils.serialize(level2)
+    val level2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+    assert(level1_ === level1, "Deserialized level1 not same as original level1")
+    assert(level2_ === level2, "Deserialized level2 not same as original level1")
+    assert(level1_ === level2_, "Deserialized level1 not same as deserialized level2")
+    assert(level2_.eq(level1_), "Deserialized level2 not the same object as deserialized level1")
+  }
+
+  test("BlockManagerId object caching") {
+    val id1 = new StorageLevel(false, false, false, 3)
+    val id2 = new StorageLevel(false, false, false, 3)
+    val bytes1 = spark.Utils.serialize(id1)
+    val id1_ = spark.Utils.deserialize[StorageLevel](bytes1)
+    val bytes2 = spark.Utils.serialize(id2)
+    val id2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+    assert(id1_ === id1, "Deserialized id1 not same as original id1")
+    assert(id2_ === id2, "Deserialized id2 not same as original id1")
+    assert(id1_ === id2_, "Deserialized id1 not same as deserialized id2")
+    assert(id2_.eq(id1_), "Deserialized id2 not the same object as deserialized level1")
+  }
+
+  test("master + 1 manager interaction") {
     store = new BlockManager(actorSystem, master, serializer, 2000)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
@@ -74,83 +105,122 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
     store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
 
-    // Checking whether blocks are in memory 
+    // Checking whether blocks are in memory
     assert(store.getSingle("a1") != None, "a1 was not in store")
     assert(store.getSingle("a2") != None, "a2 was not in store")
     assert(store.getSingle("a3") != None, "a3 was not in store")
 
     // Checking whether master knows about the blocks or not
-    assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
-    assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
-    assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
-    
+    assert(master.getLocations("a1").size > 0, "master was not told about a1")
+    assert(master.getLocations("a2").size > 0, "master was not told about a2")
+    assert(master.getLocations("a3").size === 0, "master was told about a3")
+
     // Drop a1 and a2 from memory; this should be reported back to the master
     store.dropFromMemory("a1", null)
     store.dropFromMemory("a2", null)
     assert(store.getSingle("a1") === None, "a1 not removed from store")
     assert(store.getSingle("a2") === None, "a2 not removed from store")
-    assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
-    assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
+    assert(master.getLocations("a1").size === 0, "master did not remove a1")
+    assert(master.getLocations("a2").size === 0, "master did not remove a2")
   }
 
-  test("reregistration on heart beat") {
-    val heartBeat = PrivateMethod[Unit]('heartBeat)
+  test("master + 2 managers interaction") {
     store = new BlockManager(actorSystem, master, serializer, 2000)
+    store2 = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
+
+    val peers = master.getPeers(store.blockManagerId, 1)
+    assert(peers.size === 1, "master did not return the other manager as a peer")
+    assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
+
     val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+    store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
+    assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
+    assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
+  }
 
-    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+  test("removing block") {
+    store = new BlockManager(actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
 
-    assert(store.getSingle("a1") != None, "a1 was not in store")
-    assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+    // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
+    store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false)
 
-    master.notifyADeadHost(store.blockManagerId.ip)
-    assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+    // Checking whether blocks are in memory and memory size
+    val memStatus = master.getMemoryStatus.head._2
+    assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
+    assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+    assert(store.getSingle("a1-to-remove") != None, "a1 was not in store")
+    assert(store.getSingle("a2-to-remove") != None, "a2 was not in store")
+    assert(store.getSingle("a3-to-remove") != None, "a3 was not in store")
 
-    store invokePrivate heartBeat()
-    assert(master.mustGetLocations(GetLocations("a1")).size > 0,
-           "a1 was not reregistered with master")
+    // Checking whether master knows about the blocks or not
+    assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
+    assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2")
+    assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3")
+
+    // Remove a1 and a2 and a3. Should be no-op for a3.
+    master.removeBlock("a1-to-remove")
+    master.removeBlock("a2-to-remove")
+    master.removeBlock("a3-to-remove")
+
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      store.getSingle("a1-to-remove") should be (None)
+      master.getLocations("a1-to-remove") should have size 0
+    }
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      store.getSingle("a2-to-remove") should be (None)
+      master.getLocations("a2-to-remove") should have size 0
+    }
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      store.getSingle("a3-to-remove") should not be (None)
+      master.getLocations("a3-to-remove") should have size 0
+    }
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      val memStatus = master.getMemoryStatus.head._2
+      memStatus._1 should equal (2000L)
+      memStatus._2 should equal (2000L)
+    }
   }
 
-  test("reregistration on block update") {
+  test("reregistration on heart beat") {
+    val heartBeat = PrivateMethod[Unit]('heartBeat)
     store = new BlockManager(actorSystem, master, serializer, 2000)
     val a1 = new Array[Byte](400)
-    val a2 = new Array[Byte](400)
 
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
 
-    assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+    assert(store.getSingle("a1") != None, "a1 was not in store")
+    assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
     master.notifyADeadHost(store.blockManagerId.ip)
-    assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
-    
-    store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+    assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
-    assert(master.mustGetLocations(GetLocations("a1")).size > 0,
-        "a1 was not reregistered with master")
-    assert(master.mustGetLocations(GetLocations("a2")).size > 0,
-        "master was not told about a2")
+    store invokePrivate heartBeat()
+    assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
   }
 
-  test("deregistration on duplicate") {
-    val heartBeat = PrivateMethod[Unit]('heartBeat)
+  test("reregistration on block update") {
     store = new BlockManager(actorSystem, master, serializer, 2000)
     val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
 
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
 
-    assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+    assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
-    store2 = new BlockManager(actorSystem, master, serializer, 2000)
-
-    assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+    master.notifyADeadHost(store.blockManagerId.ip)
+    assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
-    store invokePrivate heartBeat()
-    
-    assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+    store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
 
-    store2 invokePrivate heartBeat()
-    
-    assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master")
+    assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+    assert(master.getLocations("a2").size > 0, "master was not told about a2")
   }
 
   test("in-memory LRU storage") {
@@ -171,7 +241,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(store.getSingle("a2") != None, "a2 was not in store")
     assert(store.getSingle("a3") === None, "a3 was in store")
   }
-  
+
   test("in-memory LRU storage with serialization") {
     store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 2f67bb9921e1c8dc9aa12e2a0b5212a8bbe2c599..34b93fb694478690f695099f936b4414ad386888 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -87,7 +87,7 @@ object SparkBuild extends Build {
 
     libraryDependencies ++= Seq(
       "org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
-      "org.scalatest" %% "scalatest" % "1.6.1" % "test",
+      "org.scalatest" %% "scalatest" % "1.8" % "test",
       "org.scalacheck" %% "scalacheck" % "1.9" % "test",
       "com.novocode" % "junit-interface" % "0.8" % "test"
     ),