diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 9f2b0c42c74d60bd933240af9fb88d706708a72b..272d7cdad3bc213b1202b5d249015d957319181b 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -88,7 +88,7 @@ object SparkEnv extends Logging {
     val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
     
     val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
-    val blockManager = new BlockManager(blockManagerMaster, serializer)
+    val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
     
     val connectionManager = blockManager.connectionManager
 
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 8b2a71add53b85c41a7b02044c3089ce9b48500c..4211d805967a9087e19a7f0285d9efdad776913c 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -35,11 +35,15 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
 
     /* Start the Slaves */
     for (slaveNum <- 1 to numSlaves) {
+      /* We can pretend to test distributed stuff by giving the slaves distinct hostnames.
+         All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is
+         sufficiently distinctive. */
+      val slaveIpAddress = "127.100.0." + (slaveNum % 256)
       val (actorSystem, boundPort) = 
-        AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
+        AkkaUtils.createActorSystem("sparkWorker" + slaveNum, slaveIpAddress, 0)
       slaveActorSystems += actorSystem
       val actor = actorSystem.actorOf(
-        Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
+        Props(new Worker(slaveIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
         name = "Worker")
       slaveActors += actor
     }
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bf52b510b4fd49616ebc9ca1985c4f9fdb9162d3..4e7d11996f8b2d8db3a00dbbc19e7fc8a2008cad 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,7 +1,9 @@
 package spark.storage
 
+import akka.actor.{ActorSystem, Cancellable}
 import akka.dispatch.{Await, Future}
 import akka.util.Duration
+import akka.util.duration._
 
 import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
 
@@ -12,7 +14,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
 import scala.collection.JavaConversions._
 
-import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils}
+import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
 import spark.network._
 import spark.serializer.Serializer
 import spark.util.ByteBufferInputStream
@@ -45,13 +47,13 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
   }
 }
 
-
 private[spark] 
 case class BlockException(blockId: String, message: String, ex: Exception = null)
 extends Exception(message)
 
 private[spark]
-class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
+class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
+                   val serializer: Serializer, maxMemory: Long)
   extends Logging {
 
   class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@@ -104,15 +106,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   // Whether to compress RDD partitions that are stored serialized
   val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
 
+  val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
+
   val host = System.getProperty("spark.hostname", Utils.localHostName())
 
+  @volatile private var shuttingDown = false
+
+  private def heartBeat() {
+    if (!master.mustHeartBeat(HeartBeat(blockManagerId))) {
+      reregister()
+    }
+  }
+
+  var heartBeatTask: Cancellable = null
+
   initialize()
 
   /**
    * Construct a BlockManager with a memory limit set based on system properties.
    */
-  def this(master: BlockManagerMaster, serializer: Serializer) = {
-    this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+  def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = {
+    this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
   }
 
   /**
@@ -123,6 +137,43 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
     master.mustRegisterBlockManager(
       RegisterBlockManager(blockManagerId, maxMemory))
     BlockManagerWorker.startBlockManagerWorker(this)
+    if (!BlockManager.getDisableHeartBeatsForTesting) {
+      heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
+        heartBeat()
+      }
+    }
+  }
+
+  /**
+   * Report all blocks to the BlockManager again. This may be necessary if we are dropped
+   * by the BlockManager and come back or if we become capable of recovering blocks on disk after
+   * an executor crash.
+   *
+   * This function deliberately fails silently if the master returns false (indicating that
+   * the slave needs to reregister). The error condition will be detected again by the next
+   * heart beat attempt or new block registration and another try to reregister all blocks
+   * will be made then.
+   */
+  private def reportAllBlocks() {
+    logInfo("Reporting " + blockInfo.size + " blocks to the master.")
+    for (blockId <- blockInfo.keys) {
+      if (!tryToReportBlockStatus(blockId)) {
+        logError("Failed to report " + blockId + " to master; giving up.")
+        return
+      }
+    }
+  }
+
+  /**
+   * Reregister with the master and report all blocks to it. This will be called by the heart beat
+   * thread if our heartbeat to the block amnager indicates that we were not registered.
+   */
+  def reregister() {
+    // TODO: We might need to rate limit reregistering.
+    logInfo("BlockManager reregistering with master")
+    master.mustRegisterBlockManager(
+      RegisterBlockManager(blockManagerId, maxMemory))
+    reportAllBlocks()
   }
 
   /**
@@ -134,12 +185,25 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   }
 
   /**
-   * Tell the master about the current storage status of a block. This will send a heartbeat
+   * Tell the master about the current storage status of a block. This will send a block update
    * message reflecting the current status, *not* the desired storage level in its block info.
    * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
    */
   def reportBlockStatus(blockId: String) {
+    val needReregister = !tryToReportBlockStatus(blockId)
+    if (needReregister) {
+      logInfo("Got told to reregister updating block " + blockId)
+      // Reregistering will report our new block for free.
+      reregister()
+    }
+    logDebug("Told master about block " + blockId)
+  }
 
+  /**
+   * Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo
+   * block was successfully recorded and false if the slave needs to reregister.
+   */
+  private def tryToReportBlockStatus(blockId: String): Boolean = {
     val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
       case null =>
         (StorageLevel.NONE, 0L, 0L)
@@ -159,10 +223,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
           }
         }
     }
-    master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
-    logDebug("Told master about block " + blockId)
+    return master.mustBlockUpdate(
+      BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
   }
 
+
   /**
    * Get locations of the block.
    */
@@ -840,6 +905,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   }
 
   def stop() {
+    if (heartBeatTask != null) {
+      heartBeatTask.cancel()
+    }
     connectionManager.stop()
     blockInfo.clear()
     memoryStore.clear()
@@ -855,6 +923,12 @@ object BlockManager extends Logging {
     (Runtime.getRuntime.maxMemory * memoryFraction).toLong
   }
 
+  def getHeartBeatFrequencyFromSystemProperties: Long =
+    System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong
+
+  def getDisableHeartBeatsForTesting: Boolean =
+    System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+
   /**
    * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
    * might cause errors if one attempts to read from the unmapped buffer, but it's better than
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index ace27e758c4bfb52e17c08537c092c2f83c9c5c3..a7b60fc2cffc2a7f293ba4b072b73ce659ba2cda 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -26,7 +26,10 @@ case class RegisterBlockManager(
   extends ToBlockManagerMaster
 
 private[spark]
-class HeartBeat(
+case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+private[spark]
+class BlockUpdate(
     var blockManagerId: BlockManagerId,
     var blockId: String,
     var storageLevel: StorageLevel,
@@ -57,17 +60,17 @@ class HeartBeat(
 }
 
 private[spark]
-object HeartBeat {
+object BlockUpdate {
   def apply(blockManagerId: BlockManagerId,
       blockId: String,
       storageLevel: StorageLevel,
       memSize: Long,
-      diskSize: Long): HeartBeat = {
-    new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize)
+      diskSize: Long): BlockUpdate = {
+    new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
   }
 
   // For pattern-matching
-  def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+  def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
     Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
   }
 }
@@ -90,6 +93,9 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster
 private[spark]
 case object GetMemoryStatus extends ToBlockManagerMaster
 
+private[spark]
+case object ExpireDeadHosts extends ToBlockManagerMaster
+
 
 private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
 
@@ -105,7 +111,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
       blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
 
     def updateLastSeenMs() {
-      _lastSeenMs = System.currentTimeMillis() / 1000
+      _lastSeenMs = System.currentTimeMillis()
     }
 
     def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
@@ -156,6 +162,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
 
     def lastSeenMs: Long = _lastSeenMs
 
+    def blocks: JHashMap[String, StorageLevel] = _blocks
+
     override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
 
     def clear() {
@@ -164,26 +172,84 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
   }
 
   private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
+  private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
   private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
 
   initLogging()
 
+  val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
+    "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
+
+  val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
+    "5000").toLong
+
+  var timeoutCheckingTask: Cancellable = null
+
+  override def preStart() {
+    if (!BlockManager.getDisableHeartBeatsForTesting) {
+      timeoutCheckingTask = context.system.scheduler.schedule(
+        0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+    }
+    super.preStart()
+  }
+
+  def removeBlockManager(blockManagerId: BlockManagerId) {
+    val info = blockManagerInfo(blockManagerId)
+    blockManagerIdByHost.remove(blockManagerId.ip)
+    blockManagerInfo.remove(blockManagerId)
+    var iterator = info.blocks.keySet.iterator
+    while (iterator.hasNext) {
+      val blockId = iterator.next
+      val locations = blockInfo.get(blockId)._2
+      locations -= blockManagerId
+      if (locations.size == 0) {
+        blockInfo.remove(locations)
+      }
+    }
+  }
+
+  def expireDeadHosts() {
+    logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
+    val now = System.currentTimeMillis()
+    val minSeenTime = now - slaveTimeout
+    val toRemove = new HashSet[BlockManagerId]
+    for (info <- blockManagerInfo.values) {
+      if (info.lastSeenMs < minSeenTime) {
+        logInfo("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
+        toRemove += info.blockManagerId
+      }
+    }
+    // TODO: Remove corresponding block infos
+    toRemove.foreach(removeBlockManager)
+  }
+
   def removeHost(host: String) {
     logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
     logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
-    val ip = host.split(":")(0)
-    val port = host.split(":")(1)
-    blockManagerInfo.remove(new BlockManagerId(ip, port.toInt))
+    blockManagerIdByHost.get(host).foreach(removeBlockManager)
     logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
     sender ! true
   }
 
+  def heartBeat(blockManagerId: BlockManagerId) {
+    if (!blockManagerInfo.contains(blockManagerId)) {
+      if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+        sender ! true
+      } else {
+        sender ! false
+      }
+    } else {
+      blockManagerInfo(blockManagerId).updateLastSeenMs()
+      sender ! true
+    }
+  }
+
   def receive = {
     case RegisterBlockManager(blockManagerId, maxMemSize) =>
       register(blockManagerId, maxMemSize)
 
-    case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
-      heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
+    case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+      blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
 
     case GetLocations(blockId) =>
       getLocations(blockId)
@@ -205,8 +271,17 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
     case StopBlockManagerMaster =>
       logInfo("Stopping BlockManagerMaster")
       sender ! true
+      if (timeoutCheckingTask != null) {
+        timeoutCheckingTask.cancel
+      }
       context.stop(self)
 
+    case ExpireDeadHosts =>
+      expireDeadHosts()
+
+    case HeartBeat(blockManagerId) => 
+      heartBeat(blockManagerId)
+
     case other =>
       logInfo("Got unknown message: " + other)
   }
@@ -223,17 +298,25 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
     val startTimeMs = System.currentTimeMillis()
     val tmp = " " + blockManagerId + " "
     logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
+    if (blockManagerIdByHost.contains(blockManagerId.ip) &&
+        blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
+      val oldId = blockManagerIdByHost(blockManagerId.ip)
+      logInfo("Got second registration for host " + blockManagerId +
+              "; removing old slave " + oldId)
+      removeBlockManager(oldId)
+    }
     if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
       logInfo("Got Register Msg from master node, don't register it")
     } else {
       blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
-        blockManagerId, System.currentTimeMillis() / 1000, maxMemSize))
+        blockManagerId, System.currentTimeMillis(), maxMemSize))
     }
+    blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
     logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
     sender ! true
   }
 
-  private def heartBeat(
+  private def blockUpdate(
       blockManagerId: BlockManagerId,
       blockId: String,
       storageLevel: StorageLevel,
@@ -244,15 +327,21 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
     val tmp = " " + blockManagerId + " " + blockId + " "
 
     if (!blockManagerInfo.contains(blockManagerId)) {
-      // Can happen if this is from a locally cached partition on the master
-      sender ! true
+      if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+        // We intentionally do not register the master (except in local mode),
+        // so we should not indicate failure.
+        sender ! true
+      } else {
+        sender ! false
+      }
       return
     }
 
     if (blockId == null) {
       blockManagerInfo(blockManagerId).updateLastSeenMs()
-      logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
+      logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
       sender ! true
+      return
     }
 
     blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
@@ -361,7 +450,6 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
   val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost")
   val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt
   val DEFAULT_MANAGER_IP: String = Utils.localHostName()
-  val DEFAULT_MANAGER_PORT: String = "10902"
 
   val timeout = 10.seconds
   var masterActor: ActorRef = null
@@ -405,7 +493,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
   }
 
   def notifyADeadHost(host: String) {
-    communicate(RemoveHost(host + ":" + DEFAULT_MANAGER_PORT))
+    communicate(RemoveHost(host))
     logInfo("Removed " + host + " successfully in notifyADeadHost")
   }
 
@@ -436,27 +524,49 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
     }
   }
 
-  def mustHeartBeat(msg: HeartBeat) {
-    while (! syncHeartBeat(msg)) {
-      logWarning("Failed to send heartbeat" + msg)
+  def mustHeartBeat(msg: HeartBeat): Boolean = {
+    var res = syncHeartBeat(msg)
+    while (!res.isDefined) {
+      logWarning("Failed to send heart beat " + msg)
       Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
     }
+    return res.get
   }
 
-  def syncHeartBeat(msg: HeartBeat): Boolean = {
+  def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
+    try {
+      val answer = askMaster(msg).asInstanceOf[Boolean]
+      return Some(answer)
+    } catch {
+      case e: Exception => 
+        logError("Failed in syncHeartBeat", e)
+        return None
+    }
+  }
+
+  def mustBlockUpdate(msg: BlockUpdate): Boolean = {
+    var res = syncBlockUpdate(msg)
+    while (!res.isDefined) {
+      logWarning("Failed to send block update " + msg)
+      Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
+    }
+    return res.get
+  }
+
+  def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
     val startTimeMs = System.currentTimeMillis()
     val tmp = " msg " + msg + " "
-    logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
+    logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
 
     try {
-      communicate(msg)
-      logDebug("Heartbeat sent successfully")
-      logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
-      return true
+      val answer = askMaster(msg).asInstanceOf[Boolean]
+      logDebug("Block update sent successfully")
+      logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
+      return Some(answer)
     } catch {
       case e: Exception =>
-        logError("Failed in syncHeartBeat", e)
-        return false
+        logError("Failed in syncBlockUpdate", e)
+        return None
     }
   }
 
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index e4a5b8ffdf6953f37d1ef4cb0b49aa09edab2684..5bb5a29cc42243a95c160f2eb62a65a8ba1e2792 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -74,7 +74,7 @@ private[spark] object ThreadingTest {
     val actorSystem = ActorSystem("test")
     val serializer = new KryoSerializer
     val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
-    val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
+    val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
     val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
     val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
     producers.foreach(_.start)
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index b9c19e61cd1c26d6e505d5175d9a5a9f4e2bd4a6..ad2253596df86b4330be66814e732a4332b30734 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -14,10 +14,12 @@ import spark.util.ByteBufferInputStream
 
 class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
   var store: BlockManager = null
+  var store2: BlockManager = null
   var actorSystem: ActorSystem = null
   var master: BlockManagerMaster = null
   var oldArch: String = null
   var oldOops: String = null
+  var oldHeartBeat: String = null
   
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test 
   val serializer = new KryoSerializer
@@ -29,6 +31,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case 
     oldArch = System.setProperty("os.arch", "amd64")
     oldOops = System.setProperty("spark.test.useCompressedOops", "true")
+    oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
   }
@@ -36,6 +39,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   after {
     if (store != null) {
       store.stop()
+      store = null
+    }
+    if (store2 != null) {
+      store2.stop()
+      store2 = null
     }
     actorSystem.shutdown()
     actorSystem.awaitTermination()
@@ -56,7 +64,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("manager-master interaction") {
-    store = new BlockManager(master, serializer, 2000)
+    store = new BlockManager(actorSystem, master, serializer, 2000)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -85,8 +93,68 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
   }
 
+  test("reregistration on heart beat") {
+    val heartBeat = PrivateMethod[Unit]('heartBeat)
+    store = new BlockManager(actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+    assert(store.getSingle("a1") != None, "a1 was not in store")
+    assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+    master.notifyADeadHost(store.blockManagerId.ip)
+    assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+
+    store invokePrivate heartBeat()
+    assert(master.mustGetLocations(GetLocations("a1")).size > 0,
+           "a1 was not reregistered with master")
+  }
+
+  test("reregistration on block update") {
+    store = new BlockManager(actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+    assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+    master.notifyADeadHost(store.blockManagerId.ip)
+    assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+    
+    store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+
+    assert(master.mustGetLocations(GetLocations("a1")).size > 0,
+        "a1 was not reregistered with master")
+    assert(master.mustGetLocations(GetLocations("a2")).size > 0,
+        "master was not told about a2")
+  }
+
+  test("deregistration on duplicate") {
+    val heartBeat = PrivateMethod[Unit]('heartBeat)
+    store = new BlockManager(actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+    assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+    store2 = new BlockManager(actorSystem, master, serializer, 2000)
+
+    assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+
+    store invokePrivate heartBeat()
+    
+    assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+    store2 invokePrivate heartBeat()
+    
+    assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master")
+  }
+
   test("in-memory LRU storage") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -105,7 +173,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
   
   test("in-memory LRU storage with serialization") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -124,7 +192,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU for partitions of same RDD") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -143,7 +211,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -166,7 +234,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("on-disk storage") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -179,7 +247,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -194,7 +262,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage with getLocalBytes") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -209,7 +277,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage with serialization") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -224,7 +292,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage with serialization and getLocalBytes") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -239,7 +307,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("LRU with mixed storage levels") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -264,7 +332,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU with streams") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val list1 = List(new Array[Byte](200), new Array[Byte](200))
     val list2 = List(new Array[Byte](200), new Array[Byte](200))
     val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -288,7 +356,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("LRU with mixed storage levels and streams") {
-    store = new BlockManager(master, serializer, 1200)
+    store = new BlockManager(actorSystem, master, serializer, 1200)
     val list1 = List(new Array[Byte](200), new Array[Byte](200))
     val list2 = List(new Array[Byte](200), new Array[Byte](200))
     val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -334,7 +402,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("overly large block") {
-    store = new BlockManager(master, serializer, 500)
+    store = new BlockManager(actorSystem, master, serializer, 500)
     store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingle("a1") === None, "a1 was in store")
     store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -345,49 +413,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   test("block compression") {
     try {
       System.setProperty("spark.shuffle.compress", "true")
-      store = new BlockManager(master, serializer, 2000)
+      store = new BlockManager(actorSystem, master, serializer, 2000)
       store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.shuffle.compress", "false")
-      store = new BlockManager(master, serializer, 2000)
+      store = new BlockManager(actorSystem, master, serializer, 2000)
       store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.broadcast.compress", "true")
-      store = new BlockManager(master, serializer, 2000)
+      store = new BlockManager(actorSystem, master, serializer, 2000)
       store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.broadcast.compress", "false")
-      store = new BlockManager(master, serializer, 2000)
+      store = new BlockManager(actorSystem, master, serializer, 2000)
       store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.rdd.compress", "true")
-      store = new BlockManager(master, serializer, 2000)
+      store = new BlockManager(actorSystem, master, serializer, 2000)
       store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.rdd.compress", "false")
-      store = new BlockManager(master, serializer, 2000)
+      store = new BlockManager(actorSystem, master, serializer, 2000)
       store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
-      store = new BlockManager(master, serializer, 2000)
+      store = new BlockManager(actorSystem, master, serializer, 2000)
       store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
       assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
       store.stop()