diff --git a/bin/start-master.sh b/bin/start-master.sh
index a901b1c26068e47ad0eb476aacf4928b0124c0b9..87feb261fe86bb498eedcf40c1d98b3773cf3576 100755
--- a/bin/start-master.sh
+++ b/bin/start-master.sh
@@ -26,7 +26,8 @@ fi
 # Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves
 if [ "$SPARK_PUBLIC_DNS" = "" ]; then
     # If we appear to be running on EC2, use the public address by default:
-    if [[ `hostname` == *ec2.internal ]]; then
+    # NOTE: ec2-metadata is installed on Amazon Linux AMI. Check based on that and hostname
+    if command -v ec2-metadata > /dev/null || [[ `hostname` == *ec2.internal ]]; then
         export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
     fi
 fi
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index d4f5164f7d9682effd0d8f609030a7e621566ecc..aaf433b324fecc3991f90e6234a94d685f80b217 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -114,7 +114,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isDriver: Boolea
     var array = mapStatuses(shuffleId)
     if (array != null) {
       array.synchronized {
-        if (array(mapId) != null && array(mapId).address == bmAddress) {
+        if (array(mapId) != null && array(mapId).location == bmAddress) {
           array(mapId) = null
         }
       }
@@ -277,7 +277,7 @@ private[spark] object MapOutputTracker {
           throw new FetchFailedException(null, shuffleId, -1, reduceId,
             new Exception("Missing an output location for shuffle " + shuffleId))
         } else {
-          (status.address, decompressSize(status.compressedSizes(reduceId)))
+          (status.location, decompressSize(status.compressedSizes(reduceId)))
         }
     }
   }
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index c79f34342f66ceee3df2aa612dcaaf020fe80703..0d3857f9dd4882d0ac6bda8ff7b2f5858d079033 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -94,6 +94,9 @@ abstract class RDD[T: ClassManifest](
   /** How this RDD depends on any parent RDDs. */
   protected def getDependencies(): List[Dependency[_]] = dependencies_
 
+  /** A friendly name for this RDD */
+  var name: String = null
+  
   /** Optionally overridden by subclasses to specify placement preferences. */
   protected def getPreferredLocations(split: Split): Seq[String] = Nil
 
@@ -108,7 +111,13 @@ abstract class RDD[T: ClassManifest](
   /** A unique ID for this RDD (within its SparkContext). */
   val id = sc.newRddId()
 
-  /**
+  /** Assign a name to this RDD */
+  def setName(_name: String) = {
+    name = _name
+    this
+  }
+
+  /** 
    * Set this RDD's storage level to persist its values across operations after the first time
    * it is computed. Can only be called once on each RDD.
    */
@@ -119,6 +128,8 @@ abstract class RDD[T: ClassManifest](
         "Cannot change storage level of an RDD after it was already assigned a level")
     }
     storageLevel = newLevel
+    // Register the RDD with the SparkContext
+    sc.persistentRdds(id) = this
     this
   }
 
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index d4991cb1e099278436a0f333fba4b4456a7141dd..1ac262522cdcf3ee90919380a23ddbc588455345 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -1,6 +1,7 @@
 package spark
 
 import java.io._
+import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicInteger
 import java.net.{URI, URLClassLoader}
 import java.lang.ref.WeakReference
@@ -43,6 +44,8 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
 import spark.scheduler.local.LocalScheduler
 import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
 import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import storage.BlockManagerUI
+import util.{MetadataCleaner, TimeStampedHashMap}
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -78,16 +81,27 @@ class SparkContext(
 
   // Create the Spark execution environment (cache, map output tracker, etc)
   private[spark] val env = SparkEnv.createFromSystemProperties(
+    "<driver>",
     System.getProperty("spark.driver.host"),
     System.getProperty("spark.driver.port").toInt,
     true,
     isLocal)
   SparkEnv.set(env)
 
+  // Start the BlockManager UI
+  private[spark] val ui = new BlockManagerUI(
+    env.actorSystem, env.blockManager.master.driverActor, this)
+  ui.start()
+
   // Used to store a URL for each static file/jar together with the file's local timestamp
   private[spark] val addedFiles = HashMap[String, Long]()
   private[spark] val addedJars = HashMap[String, Long]()
 
+  // Keeps track of all persisted RDDs
+  private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()
+  private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
+
+
   // Add each JAR given through the constructor
   jars.foreach { addJar(_) }
 
@@ -493,6 +507,7 @@ class SparkContext(
   /** Shut down the SparkContext. */
   def stop() {
     if (dagScheduler != null) {
+      metadataCleaner.cancel()
       dagScheduler.stop()
       dagScheduler = null
       taskScheduler = null
@@ -635,6 +650,11 @@ class SparkContext(
 
   /** Register a new RDD, returning its RDD ID */
   private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
+
+  /** Called by MetadataCleaner to clean up the persistentRdds map periodically */
+  private[spark] def cleanup(cleanupTime: Long) {
+    persistentRdds.clearOldValues(cleanupTime)
+  }
 }
 
 /**
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 4034af610c5c0ed90e9e3b28bcf5bcdb9ca488b5..d2193ae72b3d09180421a72096a9bd5f9553b3c2 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -19,6 +19,7 @@ import spark.util.AkkaUtils
  * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
  */
 class SparkEnv (
+    val executorId: String,
     val actorSystem: ActorSystem,
     val serializer: Serializer,
     val closureSerializer: Serializer,
@@ -58,11 +59,12 @@ object SparkEnv extends Logging {
   }
 
   def createFromSystemProperties(
+      executorId: String,
       hostname: String,
       port: Int,
       isDriver: Boolean,
-      isLocal: Boolean
-    ) : SparkEnv = {
+      isLocal: Boolean): SparkEnv = {
+
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
 
     // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
@@ -86,7 +88,7 @@ object SparkEnv extends Logging {
     val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
     val blockManagerMaster = new BlockManagerMaster(
       actorSystem, isDriver, isLocal, driverIp, driverPort)
-    val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
+    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
 
     val connectionManager = blockManager.connectionManager
 
@@ -122,6 +124,7 @@ object SparkEnv extends Logging {
     }
 
     new SparkEnv(
+      executorId,
       actorSystem,
       serializer,
       closureSerializer,
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index ae7726437250a137d2e214978e275e56207dc74b..1e58d012731b5888e789bf59d2b3eef1e5ee566f 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -1,7 +1,7 @@
 package spark
 
 import java.io._
-import java.net.{NetworkInterface, InetAddress, Inet4Address, URL, URI}
+import java.net._
 import java.util.{Locale, Random, UUID}
 import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
 import org.apache.hadoop.conf.Configuration
@@ -11,6 +11,7 @@ import scala.collection.JavaConversions._
 import scala.io.Source
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
+import scala.Some
 
 /**
  * Various utility methods used by Spark.
@@ -431,4 +432,18 @@ private object Utils extends Logging {
     }
     "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
   }
+
+  /**
+   * Try to find a free port to bind to on the local host. This should ideally never be needed,
+   * except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray)
+   * don't let users bind to port 0 and then figure out which free port they actually bound to.
+   * We work around this by binding a ServerSocket and immediately unbinding it. This is *not*
+   * necessarily guaranteed to work, but it's the best we can do.
+   */
+  def findFreePort(): Int = {
+    val socket = new ServerSocket(0)
+    val portBound = socket.getLocalPort
+    socket.close()
+    portBound
+  }
 }
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index b3698ffa44d57f192c052e03d5c6551c1b9a2fdf..4c95c989b53676c54e9c941b0892ff08b189a734 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -12,7 +12,7 @@ import spark.storage.StorageLevel
 import com.google.common.base.Optional
 
 
-trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
+trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] {
   def wrapRDD(rdd: RDD[T]): This
 
   implicit val classManifest: ClassManifest[T]
@@ -82,10 +82,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   }
 
   /**
-   *  Return a new RDD by first applying a function to all elements of this
-   *  RDD, and then flattening the results.
+   * Part of the workaround for SPARK-668; called in PairFlatMapWorkaround.java.
    */
-  def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
+  private[spark] def doFlatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
     import scala.collection.JavaConverters._
     def fn = (x: T) => f.apply(x).asScala
     def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
diff --git a/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java b/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java
new file mode 100644
index 0000000000000000000000000000000000000000..68b6fd6622742148761363045c6a06d0e8afeb74
--- /dev/null
+++ b/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java
@@ -0,0 +1,20 @@
+package spark.api.java;
+
+import spark.api.java.JavaPairRDD;
+import spark.api.java.JavaRDDLike;
+import spark.api.java.function.PairFlatMapFunction;
+
+import java.io.Serializable;
+
+/**
+ * Workaround for SPARK-668.
+ */
+class PairFlatMapWorkaround<T> implements Serializable {
+    /**
+     *  Return a new RDD by first applying a function to all elements of this
+     *  RDD, and then flattening the results.
+     */
+    public <K, V> JavaPairRDD<K, V> flatMap(PairFlatMapFunction<T, K, V> f) {
+        return ((JavaRDDLike <T, ?>) this).doFlatMap(f);
+    }
+}
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index ae083efc8db6ee8c46dfed4b5df4e03e91e78801..2836574ecb23bb50de012b15d7d7f8f1c54c22a8 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -9,6 +9,12 @@ import spark.{Logging, Utils}
 
 import scala.collection.mutable.ArrayBuffer
 
+/**
+ * Testing class that creates a Spark standalone process in-cluster (that is, running the
+ * spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched
+ * by the Workers still run in separate JVMs. This can be used to test distributed operation and
+ * fault recovery without spinning up a lot of processes.
+ */
 private[spark]
 class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
   
@@ -32,18 +38,15 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
     masterActor = masterActorSystem.actorOf(
       Props(new Master(localIpAddress, masterPort, 0)), name = "Master")
 
-    /* Start the Workers */
+    /* Start the Slaves */
     for (workerNum <- 1 to numWorkers) {
-      /* We can pretend to test distributed stuff by giving the workers distinct hostnames.
-         All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is
-         sufficiently distinctive. */
-      val workerIpAddress = "127.100.0." + (workerNum % 256)
       val (actorSystem, boundPort) = 
-        AkkaUtils.createActorSystem("sparkWorker" + workerNum, workerIpAddress, 0)
+        AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0)
       workerActorSystems += actorSystem
-      workerActors += actorSystem.actorOf(
-        Props(new Worker(workerIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
-        name = "Worker")
+      val actor = actorSystem.actorOf(
+        Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
+              name = "Worker")
+      workerActors += actor
     }
 
     return masterUrl
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 3347207c6dd5891f78e2048f86c4dac6ded16217..bc53b70015aa7d44918b1c04159e49c4d218299b 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -97,10 +97,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
             exec.worker.removeExecutor(exec)
 
             // Only retry certain number of times so we don't go into an infinite loop.
-            if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) {
+            if (jobInfo.incrementRetryCount < JobState.MAX_NUM_RETRY) {
               schedule()
             } else {
-              val e = new SparkException("Job %s wth ID %s failed %d times.".format(
+              val e = new SparkException("Job %s with ID %s failed %d times.".format(
                 jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
               logError(e.getMessage, e)
               throw e
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 458ee2d66589c1f910f9678ffb2eb8d6a2b23a27..a01774f511007b091cee4a31f847965944815e8d 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -14,12 +14,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
 import spark.deploy._
 import spark.deploy.JsonProtocol._
 
+/**
+ * Web UI server for the standalone master.
+ */
 private[spark]
 class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
   val RESOURCE_DIR = "spark/deploy/master/webui"
   val STATIC_RESOURCE_DIR = "spark/deploy/static"
   
-  implicit val timeout = Timeout(1 seconds)
+  implicit val timeout = Timeout(10 seconds)
   
   val handler = {
     get {
@@ -76,5 +79,4 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
       getFromResourceDirectory(RESOURCE_DIR)
     }
   }
-
 }
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 0d1fe2a6b497093a5258aab25a7d5ca6ccb0ff44..f5ff267d44fd8e3fef0cb37aebdbf39abb22d39b 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -65,9 +65,9 @@ private[spark] class ExecutorRunner(
     }
   }
 
-  /** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */
+  /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
   def substituteVariables(argument: String): String = argument match {
-    case "{{SLAVEID}}" => workerId
+    case "{{EXECUTOR_ID}}" => execId.toString
     case "{{HOSTNAME}}" => hostname
     case "{{CORES}}" => cores.toString
     case other => other
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index f9489d99fc13fe2e70a9cda105fd22ce9fa6e3b4..ef81f072a308272ae1b1dbb6f70eb0ba794df5b4 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -13,12 +13,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
 import spark.deploy.{WorkerState, RequestWorkerState}
 import spark.deploy.JsonProtocol._
 
+/**
+ * Web UI server for the standalone worker.
+ */
 private[spark]
 class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
   val RESOURCE_DIR = "spark/deploy/worker/webui"
   val STATIC_RESOURCE_DIR = "spark/deploy/static"
   
-  implicit val timeout = Timeout(1 seconds)
+  implicit val timeout = Timeout(10 seconds)
   
   val handler = {
     get {
@@ -50,5 +53,4 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
       getFromResourceDirectory(RESOURCE_DIR)
     }
   }
-  
 }
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 28d9d40d43b92f314db197753b75f3658bb01cb0..bd21ba719a77cf2eaca6b2f70002f191daca4653 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -30,7 +30,7 @@ private[spark] class Executor extends Logging {
 
   initLogging()
 
-  def initialize(slaveHostname: String, properties: Seq[(String, String)]) {
+  def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) {
     // Make sure the local hostname we report matches the cluster scheduler's name for this host
     Utils.setCustomHostname(slaveHostname)
 
@@ -64,7 +64,7 @@ private[spark] class Executor extends Logging {
     )
 
     // Initialize Spark environment (using system properties read above)
-    env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
+    env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
     SparkEnv.set(env)
 
     // Start worker thread pool
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index eeab3959c662896c1148ab06bec7519d27040ffd..1ef88075ad1e27f5b3ef6ae0e8ab0165c81d524e 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -29,9 +29,10 @@ private[spark] class MesosExecutorBackend(executor: Executor)
       executorInfo: ExecutorInfo,
       frameworkInfo: FrameworkInfo,
       slaveInfo: SlaveInfo) {
+    logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
     this.driver = driver
     val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
-    executor.initialize(slaveInfo.getHostname, properties)
+    executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties)
   }
 
   override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index f80f1b527417687a1bc56894937a9cc66c5b52b5..e45288ff5300aa1b7edf1983576c53b972caf8d4 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -8,16 +8,16 @@ import akka.actor.{ActorRef, Actor, Props}
 import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
 import akka.remote.RemoteClientLifeCycleEvent
 import spark.scheduler.cluster._
-import spark.scheduler.cluster.RegisteredSlave
+import spark.scheduler.cluster.RegisteredExecutor
 import spark.scheduler.cluster.LaunchTask
-import spark.scheduler.cluster.RegisterSlaveFailed
-import spark.scheduler.cluster.RegisterSlave
+import spark.scheduler.cluster.RegisterExecutorFailed
+import spark.scheduler.cluster.RegisterExecutor
 
 
 private[spark] class StandaloneExecutorBackend(
     executor: Executor,
     driverUrl: String,
-    workerId: String,
+    executorId: String,
     hostname: String,
     cores: Int)
   extends Actor
@@ -30,7 +30,7 @@ private[spark] class StandaloneExecutorBackend(
     try {
       logInfo("Connecting to driver: " + driverUrl)
       driver = context.actorFor(driverUrl)
-      driver ! RegisterSlave(workerId, hostname, cores)
+      driver ! RegisterExecutor(executorId, hostname, cores)
       context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
       context.watch(driver) // Doesn't work with remote actors, but useful for testing
     } catch {
@@ -41,11 +41,11 @@ private[spark] class StandaloneExecutorBackend(
   }
 
   override def receive = {
-    case RegisteredSlave(sparkProperties) =>
+    case RegisteredExecutor(sparkProperties) =>
       logInfo("Successfully registered with driver")
-      executor.initialize(hostname, sparkProperties)
+      executor.initialize(executorId, hostname, sparkProperties)
 
-    case RegisterSlaveFailed(message) =>
+    case RegisterExecutorFailed(message) =>
       logError("Slave registration failed: " + message)
       System.exit(1)
 
@@ -55,24 +55,24 @@ private[spark] class StandaloneExecutorBackend(
   }
 
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
-    driver ! StatusUpdate(workerId, taskId, state, data)
+    driver ! StatusUpdate(executorId, taskId, state, data)
   }
 }
 
 private[spark] object StandaloneExecutorBackend {
-  def run(driverUrl: String, workerId: String, hostname: String, cores: Int) {
+  def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
     // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
     // before getting started with all our system properties, etc
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
     val actor = actorSystem.actorOf(
-      Props(new StandaloneExecutorBackend(new Executor, driverUrl, workerId, hostname, cores)),
+      Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)),
       name = "Executor")
     actorSystem.awaitTermination()
   }
 
   def main(args: Array[String]) {
     if (args.length != 4) {
-      System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <workerId> <hostname> <cores>")
+      System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores>")
       System.exit(1)
     }
     run(args(0), args(1), args(2), args(3).toInt)
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index b320be8863a2f137ad85641161554bb804889eae..bd541d420795318e289931161ed4e10e13a8bcd6 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -35,12 +35,12 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
     eventQueue.put(CompletionEvent(task, reason, result, accumUpdates))
   }
 
-  // Called by TaskScheduler when a host fails.
-  override def hostLost(host: String) {
-    eventQueue.put(HostLost(host))
+  // Called by TaskScheduler when an executor fails.
+  override def executorLost(execId: String) {
+    eventQueue.put(ExecutorLost(execId))
   }
 
-  // Called by TaskScheduler to cancel an entier TaskSet due to repeated failures.
+  // Called by TaskScheduler to cancel an entire TaskSet due to repeated failures.
   override def taskSetFailed(taskSet: TaskSet, reason: String) {
     eventQueue.put(TaskSetFailed(taskSet, reason))
   }
@@ -54,8 +54,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
   // resubmit failed stages
   val POLL_TIMEOUT = 10L
 
-  private val lock = new Object          // Used for access to the entire DAGScheduler
-
   private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
 
   val nextRunId = new AtomicInteger(0)
@@ -74,7 +72,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
 
   // For tracking failed nodes, we use the MapOutputTracker's generation number, which is
   // sent with every task. When we detect a node failing, we note the current generation number
-  // and failed host, increment it for new tasks, and use this to ignore stray ShuffleMapTask
+  // and failed executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask
   // results.
   // TODO: Garbage collect information about failure generations when we know there are no more
   //       stray messages to detect.
@@ -110,7 +108,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
   }
 
   def clearCacheLocs() {
-    cacheLocs.clear
+    cacheLocs.clear()
   }
 
   /**
@@ -273,8 +271,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
             submitStage(finalStage)
           }
 
-        case HostLost(host) =>
-          handleHostLost(host)
+        case ExecutorLost(execId) =>
+          handleExecutorLost(execId)
 
         case completion: CompletionEvent =>
           handleTaskCompletion(completion)
@@ -337,9 +335,12 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
           val rdd = job.finalStage.rdd
           val split = rdd.splits(job.partitions(0))
           val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0)
-          val result = job.func(taskContext, rdd.iterator(split, taskContext))
-          taskContext.executeOnCompleteCallbacks()
-          job.listener.taskSucceeded(0, result)
+          try {
+            val result = job.func(taskContext, rdd.iterator(split, taskContext))
+            job.listener.taskSucceeded(0, result)
+          } finally {
+            taskContext.executeOnCompleteCallbacks()
+          }
         } catch {
           case e: Exception =>
             job.listener.jobFailed(e)
@@ -435,10 +436,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
           case smt: ShuffleMapTask =>
             val stage = idToStage(smt.stageId)
             val status = event.result.asInstanceOf[MapStatus]
-            val host = status.address.ip
-            logInfo("ShuffleMapTask finished with host " + host)
-            if (failedGeneration.contains(host) && smt.generation <= failedGeneration(host)) {
-              logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + host)
+            val execId = status.location.executorId
+            logDebug("ShuffleMapTask finished on " + execId)
+            if (failedGeneration.contains(execId) && smt.generation <= failedGeneration(execId)) {
+              logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
             } else {
               stage.addOutputLoc(smt.partition, status)
             }
@@ -510,9 +511,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
         // Remember that a fetch failed now; this is used to resubmit the broken
         // stages later, after a small wait (to give other tasks the chance to fail)
         lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
-        // TODO: mark the host as failed only if there were lots of fetch failures on it
+        // TODO: mark the executor as failed only if there were lots of fetch failures on it
         if (bmAddress != null) {
-          handleHostLost(bmAddress.ip, Some(task.generation))
+          handleExecutorLost(bmAddress.executorId, Some(task.generation))
         }
 
       case other =>
@@ -522,21 +523,21 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
   }
 
   /**
-   * Responds to a host being lost. This is called inside the event loop so it assumes that it can
-   * modify the scheduler's internal state. Use hostLost() to post a host lost event from outside.
+   * Responds to an executor being lost. This is called inside the event loop, so it assumes it can
+   * modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
    *
    * Optionally the generation during which the failure was caught can be passed to avoid allowing
    * stray fetch failures from possibly retriggering the detection of a node as lost.
    */
-  def handleHostLost(host: String, maybeGeneration: Option[Long] = None) {
+  def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) {
     val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration)
-    if (!failedGeneration.contains(host) || failedGeneration(host) < currentGeneration) {
-      failedGeneration(host) = currentGeneration
-      logInfo("Host lost: " + host + " (generation " + currentGeneration + ")")
-      env.blockManager.master.notifyADeadHost(host)
+    if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) {
+      failedGeneration(execId) = currentGeneration
+      logInfo("Executor lost: %s (generation %d)".format(execId, currentGeneration))
+      env.blockManager.master.removeExecutor(execId)
       // TODO: This will be really slow if we keep accumulating shuffle map stages
       for ((shuffleId, stage) <- shuffleToMapStage) {
-        stage.removeOutputsOnHost(host)
+        stage.removeOutputsOnExecutor(execId)
         val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
         mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
       }
@@ -545,7 +546,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
       }
       clearCacheLocs()
     } else {
-      logDebug("Additional host lost message for " + host +
+      logDebug("Additional executor lost message for " + execId +
                "(generation " + currentGeneration + ")")
     }
   }
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 3422a21d9d6a9664d9f236ab30c1213970ac50de..b34fa78c072c0ccdb2c304aec72fd3464169ae72 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -28,7 +28,7 @@ private[spark] case class CompletionEvent(
     accumUpdates: Map[Long, Any])
   extends DAGSchedulerEvent
 
-private[spark] case class HostLost(host: String) extends DAGSchedulerEvent
+private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
 
 private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
 
diff --git a/core/src/main/scala/spark/scheduler/MapStatus.scala b/core/src/main/scala/spark/scheduler/MapStatus.scala
index fae643f3a8247916234b66a889a4da2655e55f2a..203abb917be1cbe25f9998d1f65815b529efebdf 100644
--- a/core/src/main/scala/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/spark/scheduler/MapStatus.scala
@@ -8,19 +8,19 @@ import java.io.{ObjectOutput, ObjectInput, Externalizable}
  * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
  * The map output sizes are compressed using MapOutputTracker.compressSize.
  */
-private[spark] class MapStatus(var address: BlockManagerId, var compressedSizes: Array[Byte])
+private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
   extends Externalizable {
 
   def this() = this(null, null)  // For deserialization only
 
   def writeExternal(out: ObjectOutput) {
-    address.writeExternal(out)
+    location.writeExternal(out)
     out.writeInt(compressedSizes.length)
     out.write(compressedSizes)
   }
 
   def readExternal(in: ObjectInput) {
-    address = BlockManagerId(in)
+    location = BlockManagerId(in)
     compressedSizes = new Array[Byte](in.readInt())
     in.readFully(compressedSizes)
   }
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 19f5328eee087774dfa50b48527e0fa905ea89d0..83641a2a8427b1d86551be7de56edaa8d4ec669a 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -81,7 +81,7 @@ private[spark] class ShuffleMapTask(
   with Externalizable
   with Logging {
 
-  def this() = this(0, null, null, 0, null)
+  protected def this() = this(0, null, null, 0, null)
 
   var split = if (rdd == null) {
     null
@@ -117,34 +117,34 @@ private[spark] class ShuffleMapTask(
 
   override def run(attemptId: Long): MapStatus = {
     val numOutputSplits = dep.partitioner.numPartitions
-    val partitioner = dep.partitioner
 
     val taskContext = new TaskContext(stageId, partition, attemptId)
+    try {
+      // Partition the map output.
+      val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
+      for (elem <- rdd.iterator(split, taskContext)) {
+        val pair = elem.asInstanceOf[(Any, Any)]
+        val bucketId = dep.partitioner.getPartition(pair._1)
+        buckets(bucketId) += pair
+      }
+      val bucketIterators = buckets.map(_.iterator)
 
-    // Partition the map output.
-    val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
-    for (elem <- rdd.iterator(split, taskContext)) {
-      val pair = elem.asInstanceOf[(Any, Any)]
-      val bucketId = partitioner.getPartition(pair._1)
-      buckets(bucketId) += pair
-    }
-    val bucketIterators = buckets.map(_.iterator)
+      val compressedSizes = new Array[Byte](numOutputSplits)
 
-    val compressedSizes = new Array[Byte](numOutputSplits)
+      val blockManager = SparkEnv.get.blockManager
+      for (i <- 0 until numOutputSplits) {
+        val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
+        // Get a Scala iterator from Java map
+        val iter: Iterator[(Any, Any)] = bucketIterators(i)
+        val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
+        compressedSizes(i) = MapOutputTracker.compressSize(size)
+      }
 
-    val blockManager = SparkEnv.get.blockManager
-    for (i <- 0 until numOutputSplits) {
-      val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
-      // Get a Scala iterator from Java map
-      val iter: Iterator[(Any, Any)] = bucketIterators(i)
-      val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
-      compressedSizes(i) = MapOutputTracker.compressSize(size)
+      return new MapStatus(blockManager.blockManagerId, compressedSizes)
+    } finally {
+      // Execute the callbacks on task completion.
+      taskContext.executeOnCompleteCallbacks()
     }
-
-    // Execute the callbacks on task completion.
-    taskContext.executeOnCompleteCallbacks()
-
-    return new MapStatus(blockManager.blockManagerId, compressedSizes)
   }
 
   override def preferredLocations: Seq[String] = locs
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 4846b6672930367e3b5983ecb5dc7f5399fdf1bd..e9419728e3f34c98711703772c0b528ffb611ebd 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -51,18 +51,18 @@ private[spark] class Stage(
 
   def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
     val prevList = outputLocs(partition)
-    val newList = prevList.filterNot(_.address == bmAddress)
+    val newList = prevList.filterNot(_.location == bmAddress)
     outputLocs(partition) = newList
     if (prevList != Nil && newList == Nil) {
       numAvailableOutputs -= 1
     }
   }
  
-  def removeOutputsOnHost(host: String) {
+  def removeOutputsOnExecutor(execId: String) {
     var becameUnavailable = false
     for (partition <- 0 until numPartitions) {
       val prevList = outputLocs(partition)
-      val newList = prevList.filterNot(_.address.ip == host)
+      val newList = prevList.filterNot(_.location.executorId == execId)
       outputLocs(partition) = newList
       if (prevList != Nil && newList == Nil) {
         becameUnavailable = true
@@ -70,7 +70,8 @@ private[spark] class Stage(
       }
     }
     if (becameUnavailable) {
-      logInfo("%s is now unavailable on %s (%d/%d, %s)".format(this, host, numAvailableOutputs, numPartitions, isAvailable))
+      logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
+        this, execId, numAvailableOutputs, numPartitions, isAvailable))
     }
   }
 
@@ -82,7 +83,7 @@ private[spark] class Stage(
 
   def origin: String = rdd.origin
 
-  override def toString = "Stage " + id // + ": [RDD = " + rdd.id + ", isShuffle = " + isShuffleMap + "]"
+  override def toString = "Stage " + id
 
   override def hashCode(): Int = id
 }
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
index fa4de15d0da605c8252f26163af0374fe9a8746d..9fcef86e46a29673133f2884defdde81b8caf9b9 100644
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
@@ -12,7 +12,7 @@ private[spark] trait TaskSchedulerListener {
   def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any]): Unit
 
   // A node was lost from the cluster.
-  def hostLost(host: String): Unit
+  def executorLost(execId: String): Unit
 
   // The TaskScheduler wants to abort an entire task set.
   def taskSetFailed(taskSet: TaskSet, reason: String): Unit
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index a639b72795fe69a9e39164f6f8b716cc05985275..0b4177805b37412613f2e3dec50a755f77d6c7b4 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -27,19 +27,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
 
   val taskIdToTaskSetId = new HashMap[Long, String]
-  val taskIdToSlaveId = new HashMap[Long, String]
+  val taskIdToExecutorId = new HashMap[Long, String]
   val taskSetTaskIds = new HashMap[String, HashSet[Long]]
 
   // Incrementing Mesos task IDs
   val nextTaskId = new AtomicLong(0)
 
-  // Which hosts in the cluster are alive (contains hostnames)
-  val hostsAlive = new HashSet[String]
+  // Which executor IDs we have executors on
+  val activeExecutorIds = new HashSet[String]
 
-  // Which slave IDs we have executors on
-  val slaveIdsWithExecutors = new HashSet[String]
+  // The set of executors we have on each host; this is used to compute hostsAlive, which
+  // in turn is used to decide when we can attain data locality on a given host
+  val executorsByHost = new HashMap[String, HashSet[String]]
 
-  val slaveIdToHost = new HashMap[String, String]
+  val executorIdToHost = new HashMap[String, String]
 
   // JAR server, if any JARs were added by the user to the SparkContext
   var jarServer: HttpServer = null
@@ -102,7 +103,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       activeTaskSets -= manager.taskSet.id
       activeTaskSetsQueue -= manager
       taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
-      taskIdToSlaveId --= taskSetTaskIds(manager.taskSet.id)
+      taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
       taskSetTaskIds.remove(manager.taskSet.id)
     }
   }
@@ -117,8 +118,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       SparkEnv.set(sc.env)
       // Mark each slave as alive and remember its hostname
       for (o <- offers) {
-        slaveIdToHost(o.slaveId) = o.hostname
-        hostsAlive += o.hostname
+        executorIdToHost(o.executorId) = o.hostname
       }
       // Build a list of tasks to assign to each slave
       val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
@@ -128,16 +128,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
         do {
           launchedTask = false
           for (i <- 0 until offers.size) {
-            val sid = offers(i).slaveId
+            val execId = offers(i).executorId
             val host = offers(i).hostname
-            manager.slaveOffer(sid, host, availableCpus(i)) match {
+            manager.slaveOffer(execId, host, availableCpus(i)) match {
               case Some(task) =>
                 tasks(i) += task
                 val tid = task.taskId
                 taskIdToTaskSetId(tid) = manager.taskSet.id
                 taskSetTaskIds(manager.taskSet.id) += tid
-                taskIdToSlaveId(tid) = sid
-                slaveIdsWithExecutors += sid
+                taskIdToExecutorId(tid) = execId
+                activeExecutorIds += execId
+                if (!executorsByHost.contains(host)) {
+                  executorsByHost(host) = new HashSet()
+                }
+                executorsByHost(host) += execId
                 availableCpus(i) -= 1
                 launchedTask = true
 
@@ -152,25 +156,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
     var taskSetToUpdate: Option[TaskSetManager] = None
-    var failedHost: Option[String] = None
+    var failedExecutor: Option[String] = None
     var taskFailed = false
     synchronized {
       try {
-        if (state == TaskState.LOST && taskIdToSlaveId.contains(tid)) {
-          // We lost the executor on this slave, so remember that it's gone
-          val slaveId = taskIdToSlaveId(tid)
-          val host = slaveIdToHost(slaveId)
-          if (hostsAlive.contains(host)) {
-            slaveIdsWithExecutors -= slaveId
-            hostsAlive -= host
-            activeTaskSetsQueue.foreach(_.hostLost(host))
-            failedHost = Some(host)
+        if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
+          // We lost this entire executor, so remember that it's gone
+          val execId = taskIdToExecutorId(tid)
+          if (activeExecutorIds.contains(execId)) {
+            removeExecutor(execId)
+            failedExecutor = Some(execId)
           }
         }
         taskIdToTaskSetId.get(tid) match {
           case Some(taskSetId) =>
             if (activeTaskSets.contains(taskSetId)) {
-              //activeTaskSets(taskSetId).statusUpdate(status)
               taskSetToUpdate = Some(activeTaskSets(taskSetId))
             }
             if (TaskState.isFinished(state)) {
@@ -178,7 +178,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
               if (taskSetTaskIds.contains(taskSetId)) {
                 taskSetTaskIds(taskSetId) -= tid
               }
-              taskIdToSlaveId.remove(tid)
+              taskIdToExecutorId.remove(tid)
             }
             if (state == TaskState.FAILED) {
               taskFailed = true
@@ -190,12 +190,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
         case e: Exception => logError("Exception in statusUpdate", e)
       }
     }
-    // Update the task set and DAGScheduler without holding a lock on this, because that can deadlock
+    // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock
     if (taskSetToUpdate != None) {
       taskSetToUpdate.get.statusUpdate(tid, state, serializedData)
     }
-    if (failedHost != None) {
-      listener.hostLost(failedHost.get)
+    if (failedExecutor != None) {
+      listener.executorLost(failedExecutor.get)
       backend.reviveOffers()
     }
     if (taskFailed) {
@@ -249,32 +249,42 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     }
   }
 
-  def slaveLost(slaveId: String, reason: ExecutorLossReason) {
-    var failedHost: Option[String] = None
+  def executorLost(executorId: String, reason: ExecutorLossReason) {
+    var failedExecutor: Option[String] = None
     synchronized {
-      slaveIdToHost.get(slaveId) match {
-        case Some(host) =>
-          if (hostsAlive.contains(host)) {
-            logError("Lost an executor on " + host + ": " + reason)
-            slaveIdsWithExecutors -= slaveId
-            hostsAlive -= host
-            activeTaskSetsQueue.foreach(_.hostLost(host))
-            failedHost = Some(host)
-          } else {
-            // We may get multiple slaveLost() calls with different loss reasons. For example, one 
-            // may be triggered by a dropped connection from the slave while another may be a report
-            // of executor termination from Mesos. We produce log messages for both so we eventually
-            // report the termination reason.
-            logError("Lost an executor on " + host + " (already removed): " + reason)
-          }
-        case None =>
-          // We were told about a slave being lost before we could even allocate work to it
-          logError("Lost slave " + slaveId + " (no work assigned yet)")
+      if (activeExecutorIds.contains(executorId)) {
+        val host = executorIdToHost(executorId)
+        logError("Lost executor %s on %s: %s".format(executorId, host, reason))
+        removeExecutor(executorId)
+        failedExecutor = Some(executorId)
+      } else {
+         // We may get multiple executorLost() calls with different loss reasons. For example, one
+         // may be triggered by a dropped connection from the slave while another may be a report
+         // of executor termination from Mesos. We produce log messages for both so we eventually
+         // report the termination reason.
+         logError("Lost an executor " + executorId + " (already removed): " + reason)
       }
     }
-    if (failedHost != None) {
-      listener.hostLost(failedHost.get)
+    // Call listener.executorLost without holding the lock on this to prevent deadlock
+    if (failedExecutor != None) {
+      listener.executorLost(failedExecutor.get)
       backend.reviveOffers()
     }
   }
+
+  /** Get a list of hosts that currently have executors */
+  def hostsAlive: scala.collection.Set[String] = executorsByHost.keySet
+
+  /** Remove an executor from all our data structures and mark it as lost */
+  private def removeExecutor(executorId: String) {
+    activeExecutorIds -= executorId
+    val host = executorIdToHost(executorId)
+    val execs = executorsByHost.getOrElse(host, new HashSet)
+    execs -= executorId
+    if (execs.isEmpty) {
+      executorsByHost -= host
+    }
+    executorIdToHost -= executorId
+    activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
+  }
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
deleted file mode 100644
index 96ebaa460118e45a9dbd40cb3a5c0c36a0b13907..0000000000000000000000000000000000000000
--- a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala
+++ /dev/null
@@ -1,4 +0,0 @@
-package spark.scheduler.cluster
-
-private[spark]
-class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 866beb6d01efac41320d4dba49dcf8a718018bb7..9760d23072d68e2b84445e0a1f569b4b0f05ef0c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -19,7 +19,6 @@ private[spark] class SparkDeploySchedulerBackend(
   var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
 
   val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
-  val executorIdToWorkerId = new HashMap[String, String]
 
   // Memory used by each executor (in megabytes)
   val executorMemory = {
@@ -38,7 +37,7 @@ private[spark] class SparkDeploySchedulerBackend(
     val driverUrl = "akka://spark@%s:%s/user/%s".format(
       System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
       StandaloneSchedulerBackend.ACTOR_NAME)
-    val args = Seq(driverUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}")
+    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
     val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
     val sparkHome = sc.getSparkHome().getOrElse(throw new IllegalArgumentException("must supply spark home for spark standalone"))
     val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command, sparkHome)
@@ -48,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend(
   }
 
   override def stop() {
-    stopping = true;
+    stopping = true
     super.stop()
     client.stop()
     if (shutdownCallback != null) {
@@ -67,24 +66,17 @@ private[spark] class SparkDeploySchedulerBackend(
     }
   }
 
-  override def executorAdded(fullId: String, workerId: String, host: String, cores: Int, memory: Int) {
-    executorIdToWorkerId += fullId -> workerId
+  override def executorAdded(executorId: String, workerId: String, host: String, cores: Int, memory: Int) {
     logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
-       fullId, host, cores, Utils.memoryMegabytesToString(memory)))
+       executorId, host, cores, Utils.memoryMegabytesToString(memory)))
   }
 
-  override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
+  override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
     val reason: ExecutorLossReason = exitStatus match {
       case Some(code) => ExecutorExited(code)
       case None => SlaveLost(message)
     }
-    logInfo("Executor %s removed: %s".format(fullId, message))
-    executorIdToWorkerId.get(fullId) match {
-      case Some(workerId) => 
-        executorIdToWorkerId.remove(fullId)
-        scheduler.slaveLost(workerId, reason)
-      case None =>
-        logInfo("No worker ID known for executor %s".format(fullId))
-    }
+    logInfo("Executor %s removed: %s".format(executorId, message))
+    scheduler.executorLost(executorId, reason)
   }
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index bea9dc4f23a769684014fc3ec84ca2657951285b..da7dcf4b6b48e8b5eb851fbef8d48d79e20dc09e 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -11,24 +11,26 @@ private[spark]
 case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
 
 private[spark]
-case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
+case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
+  extends StandaloneClusterMessage
 
 private[spark]
-case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
+case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
 
 // Executors to driver
 private[spark]
-case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
+case class RegisterExecutor(executorId: String, host: String, cores: Int)
+  extends StandaloneClusterMessage
 
 private[spark]
-case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
+case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
   extends StandaloneClusterMessage
 
 private[spark]
 object StatusUpdate {
   /** Alternate factory method that takes a ByteBuffer directly for the data field */
-  def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
-    StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data))
+  def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
+    StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
   }
 }
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index d742a7b2bf8480ba179c14e9015296fcf81f4bd6..082022be1c9da0a487e65879e57814b793ebe838 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -24,12 +24,12 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
   var totalCoreCount = new AtomicInteger(0)
 
   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
-    val slaveActor = new HashMap[String, ActorRef]
-    val slaveAddress = new HashMap[String, Address]
-    val slaveHost = new HashMap[String, String]
+    val executorActor = new HashMap[String, ActorRef]
+    val executorAddress = new HashMap[String, Address]
+    val executorHost = new HashMap[String, String]
     val freeCores = new HashMap[String, Int]
-    val actorToSlaveId = new HashMap[ActorRef, String]
-    val addressToSlaveId = new HashMap[Address, String]
+    val actorToExecutorId = new HashMap[ActorRef, String]
+    val addressToExecutorId = new HashMap[Address, String]
 
     override def preStart() {
       // Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -37,28 +37,28 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
     }
 
     def receive = {
-      case RegisterSlave(workerId, host, cores) =>
-        if (slaveActor.contains(workerId)) {
-          sender ! RegisterSlaveFailed("Duplicate slave ID: " + workerId)
+      case RegisterExecutor(executorId, host, cores) =>
+        if (executorActor.contains(executorId)) {
+          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
         } else {
-          logInfo("Registered slave: " + sender + " with ID " + workerId)
-          sender ! RegisteredSlave(sparkProperties)
+          logInfo("Registered executor: " + sender + " with ID " + executorId)
+          sender ! RegisteredExecutor(sparkProperties)
           context.watch(sender)
-          slaveActor(workerId) = sender
-          slaveHost(workerId) = host
-          freeCores(workerId) = cores
-          slaveAddress(workerId) = sender.path.address
-          actorToSlaveId(sender) = workerId
-          addressToSlaveId(sender.path.address) = workerId
+          executorActor(executorId) = sender
+          executorHost(executorId) = host
+          freeCores(executorId) = cores
+          executorAddress(executorId) = sender.path.address
+          actorToExecutorId(sender) = executorId
+          addressToExecutorId(sender.path.address) = executorId
           totalCoreCount.addAndGet(cores)
           makeOffers()
         }
 
-      case StatusUpdate(workerId, taskId, state, data) =>
+      case StatusUpdate(executorId, taskId, state, data) =>
         scheduler.statusUpdate(taskId, state, data.value)
         if (TaskState.isFinished(state)) {
-          freeCores(workerId) += 1
-          makeOffers(workerId)
+          freeCores(executorId) += 1
+          makeOffers(executorId)
         }
 
       case ReviveOffers =>
@@ -69,47 +69,47 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
         context.stop(self)
 
       case Terminated(actor) =>
-        actorToSlaveId.get(actor).foreach(removeSlave(_, "Akka actor terminated"))
+        actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
 
       case RemoteClientDisconnected(transport, address) =>
-        addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client disconnected"))
+        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
 
       case RemoteClientShutdown(transport, address) =>
-        addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client shutdown"))
+        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
     }
 
-    // Make fake resource offers on all slaves
+    // Make fake resource offers on all executors
     def makeOffers() {
       launchTasks(scheduler.resourceOffers(
-        slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+        executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
     }
 
-    // Make fake resource offers on just one slave
-    def makeOffers(workerId: String) {
+    // Make fake resource offers on just one executor
+    def makeOffers(executorId: String) {
       launchTasks(scheduler.resourceOffers(
-        Seq(new WorkerOffer(workerId, slaveHost(workerId), freeCores(workerId)))))
+        Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
     }
 
     // Launch tasks returned by a set of resource offers
     def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
       for (task <- tasks.flatten) {
-        freeCores(task.slaveId) -= 1
-        slaveActor(task.slaveId) ! LaunchTask(task)
+        freeCores(task.executorId) -= 1
+        executorActor(task.executorId) ! LaunchTask(task)
       }
     }
 
     // Remove a disconnected slave from the cluster
-    def removeSlave(workerId: String, reason: String) {
-      logInfo("Slave " + workerId + " disconnected, so removing it")
-      val numCores = freeCores(workerId)
-      actorToSlaveId -= slaveActor(workerId)
-      addressToSlaveId -= slaveAddress(workerId)
-      slaveActor -= workerId
-      slaveHost -= workerId
-      freeCores -= workerId
-      slaveHost -= workerId
+    def removeExecutor(executorId: String, reason: String) {
+      logInfo("Slave " + executorId + " disconnected, so removing it")
+      val numCores = freeCores(executorId)
+      actorToExecutorId -= executorActor(executorId)
+      addressToExecutorId -= executorAddress(executorId)
+      executorActor -= executorId
+      executorHost -= executorId
+      freeCores -= executorId
+      executorHost -= executorId
       totalCoreCount.addAndGet(-numCores)
-      scheduler.slaveLost(workerId, SlaveLost(reason))
+      scheduler.executorLost(executorId, SlaveLost(reason))
     }
   }
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index aa097fd3a25c1e89fd6793237aa134fb47d55e88..b41e951be99f84c68b800a1cf7a46636d3ced3f8 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -5,7 +5,7 @@ import spark.util.SerializableBuffer
 
 private[spark] class TaskDescription(
     val taskId: Long,
-    val slaveId: String,
+    val executorId: String,
     val name: String,
     _serializedTask: ByteBuffer)
   extends Serializable {
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index ca845037801619788be9878a5c4d26c3f9243d4f..0f975ce1eb4192caa024b279c16328b2e312d806 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -4,7 +4,12 @@ package spark.scheduler.cluster
  * Information about a running task attempt inside a TaskSet.
  */
 private[spark]
-class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) {
+class TaskInfo(
+    val taskId: Long,
+    val index: Int,
+    val launchTime: Long,
+    val executorId: String,
+    val host: String) {
   var finishTime: Long = 0
   var failed = false
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index a089b71644d3b643f5c6497e02a010998569b20b..26201ad0dd1a0314d01b11f1d253dee40eadca13 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -138,10 +138,11 @@ private[spark] class TaskSetManager(
   // attempt running on this host, in case the host is slow. In addition, if localOnly is set, the
   // task must have a preference for this host (or no preferred locations at all).
   def findSpeculativeTask(host: String, localOnly: Boolean): Option[Int] = {
+    val hostsAlive = sched.hostsAlive
     speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
     val localTask = speculatableTasks.find {
         index =>
-          val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
+          val locations = tasks(index).preferredLocations.toSet & hostsAlive
           val attemptLocs = taskAttempts(index).map(_.host)
           (locations.size == 0 || locations.contains(host)) && !attemptLocs.contains(host)
       }
@@ -189,7 +190,7 @@ private[spark] class TaskSetManager(
   }
 
   // Respond to an offer of a single slave from the scheduler by finding a task
-  def slaveOffer(slaveId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+  def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
     if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
       val time = System.currentTimeMillis
       val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -206,11 +207,11 @@ private[spark] class TaskSetManager(
           } else {
             "non-preferred, not one of " + task.preferredLocations.mkString(", ")
           }
-          logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
-            taskSet.id, index, taskId, slaveId, host, prefStr))
+          logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
+            taskSet.id, index, taskId, execId, host, prefStr))
           // Do various bookkeeping
           copiesRunning(index) += 1
-          val info = new TaskInfo(taskId, index, time, host)
+          val info = new TaskInfo(taskId, index, time, execId, host)
           taskInfos(taskId) = info
           taskAttempts(index) = info :: taskAttempts(index)
           if (preferred) {
@@ -224,7 +225,7 @@ private[spark] class TaskSetManager(
           logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
             taskSet.id, index, serializedTask.limit, timeTaken))
           val taskName = "task %s:%d".format(taskSet.id, index)
-          return Some(new TaskDescription(taskId, slaveId, taskName, serializedTask))
+          return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
         }
         case _ =>
       }
@@ -356,19 +357,22 @@ private[spark] class TaskSetManager(
     sched.taskSetFinished(this)
   }
 
-  def hostLost(hostname: String) {
-    logInfo("Re-queueing tasks for " + hostname + " from TaskSet " + taskSet.id)
-    // If some task has preferred locations only on hostname, put it in the no-prefs list
-    // to avoid the wait from delay scheduling
-    for (index <- getPendingTasksForHost(hostname)) {
-      val newLocs = tasks(index).preferredLocations.toSet & sched.hostsAlive
-      if (newLocs.isEmpty) {
-        pendingTasksWithNoPrefs += index
+  def executorLost(execId: String, hostname: String) {
+    logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
+    val newHostsAlive = sched.hostsAlive
+    // If some task has preferred locations only on hostname, and there are no more executors there,
+    // put it in the no-prefs list to avoid the wait from delay scheduling
+    if (!newHostsAlive.contains(hostname)) {
+      for (index <- getPendingTasksForHost(hostname)) {
+        val newLocs = tasks(index).preferredLocations.toSet & newHostsAlive
+        if (newLocs.isEmpty) {
+          pendingTasksWithNoPrefs += index
+        }
       }
     }
-    // Re-enqueue any tasks that ran on the failed host if this is a shuffle map stage
+    // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
     if (tasks(0).isInstanceOf[ShuffleMapTask]) {
-      for ((tid, info) <- taskInfos if info.host == hostname) {
+      for ((tid, info) <- taskInfos if info.executorId == execId) {
         val index = taskInfos(tid).index
         if (finished(index)) {
           finished(index) = false
@@ -382,7 +386,7 @@ private[spark] class TaskSetManager(
       }
     }
     // Also re-enqueue any tasks that were running on the node
-    for ((tid, info) <- taskInfos if info.running && info.host == hostname) {
+    for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
       taskLost(tid, TaskState.KILLED, null)
     }
   }
diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
index 6b919d68b26e677005fb6f17a9d233c4ce19d628..3c3afcbb14d3f7f677be8aae91128e35eef01c7f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
@@ -1,8 +1,8 @@
 package spark.scheduler.cluster
 
 /**
- * Represents free resources available on a worker node.
+ * Represents free resources available on an executor.
  */
 private[spark]
-class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) {
+class WorkerOffer(val executorId: String, val hostname: String, val cores: Int) {
 }
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 2989e31f5e9d8ed087064f4acee19335ad86ec7b..f3467db86b6eeeaca53dc8481618958cb17ebcda 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -268,7 +268,7 @@ private[spark] class MesosSchedulerBackend(
     synchronized {
       slaveIdsWithExecutors -= slaveId.getValue
     }
-    scheduler.slaveLost(slaveId.getValue, reason)
+    scheduler.executorLost(slaveId.getValue, reason)
   }
 
   override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 19cdaaa984225bc19b428e5dfd02dbea79720bef..1215d5f5c8a3d7594c48afd40b012a9088fef282 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -30,6 +30,7 @@ extends Exception(message)
 
 private[spark]
 class BlockManager(
+    executorId: String,
     actorSystem: ActorSystem,
     val master: BlockManagerMaster,
     val serializer: Serializer,
@@ -68,8 +69,8 @@ class BlockManager(
   val connectionManager = new ConnectionManager(0)
   implicit val futureExecContext = connectionManager.futureExecContext
 
-  val connectionManagerId = connectionManager.id
-  val blockManagerId = BlockManagerId(connectionManagerId.host, connectionManagerId.port)
+  val blockManagerId = BlockManagerId(
+    executorId, connectionManager.id.host, connectionManager.id.port)
 
   // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
   // for receiving shuffle outputs)
@@ -90,7 +91,10 @@ class BlockManager(
   val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
     name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
 
-  @volatile private var shuttingDown = false
+  // Pending reregistration action being executed asynchronously or null if none
+  // is pending. Accesses should synchronize on asyncReregisterLock.
+  var asyncReregisterTask: Future[Unit] = null
+  val asyncReregisterLock = new Object
 
   private def heartBeat() {
     if (!master.sendHeartBeat(blockManagerId)) {
@@ -106,8 +110,9 @@ class BlockManager(
   /**
    * Construct a BlockManager with a memory limit set based on system properties.
    */
-  def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = {
-    this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+  def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
+           serializer: Serializer) = {
+    this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
   }
 
   /**
@@ -147,6 +152,8 @@ class BlockManager(
   /**
    * Reregister with the master and report all blocks to it. This will be called by the heart beat
    * thread if our heartbeat to the block amnager indicates that we were not registered.
+   *
+   * Note that this method must be called without any BlockInfo locks held.
    */
   def reregister() {
     // TODO: We might need to rate limit reregistering.
@@ -155,6 +162,32 @@ class BlockManager(
     reportAllBlocks()
   }
 
+  /**
+   * Reregister with the master sometime soon.
+   */
+  def asyncReregister() {
+    asyncReregisterLock.synchronized {
+      if (asyncReregisterTask == null) {
+        asyncReregisterTask = Future[Unit] {
+          reregister()
+          asyncReregisterLock.synchronized {
+            asyncReregisterTask = null
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * For testing. Wait for any pending asynchronous reregistration; otherwise, do nothing.
+   */
+  def waitForAsyncReregister() {
+    val task = asyncReregisterTask
+    if (task != null) {
+      Await.ready(task, Duration.Inf)
+    }
+  }
+
   /**
    * Get storage level of local block. If no info exists for the block, then returns null.
    */
@@ -170,7 +203,7 @@ class BlockManager(
     if (needReregister) {
       logInfo("Got told to reregister updating block " + blockId)
       // Reregistering will report our new block for free.
-      reregister()
+      asyncReregister()
     }
     logDebug("Told master about block " + blockId)
   }
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
index abb8b45a1f48b7aa1c8ce681922a5c2f2e68b930..f2f1e77d41a65cb810fd58904626dccb7b5caaff 100644
--- a/core/src/main/scala/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -7,27 +7,32 @@ import java.util.concurrent.ConcurrentHashMap
  * This class represent an unique identifier for a BlockManager.
  * The first 2 constructors of this class is made private to ensure that
  * BlockManagerId objects can be created only using the factory method in
- * [[spark.storage.BlockManager$]]. This allows de-duplication of id objects.
+ * [[spark.storage.BlockManager$]]. This allows de-duplication of ID objects.
  * Also, constructor parameters are private to ensure that parameters cannot
  * be modified from outside this class.
  */
 private[spark] class BlockManagerId private (
+    private var executorId_ : String,
     private var ip_ : String,
     private var port_ : Int
   ) extends Externalizable {
 
-  private def this() = this(null, 0)  // For deserialization only
+  private def this() = this(null, null, 0)  // For deserialization only
 
-  def ip = ip_
+  def executorId: String = executorId_
 
-  def port = port_
+  def ip: String = ip_
+
+  def port: Int = port_
 
   override def writeExternal(out: ObjectOutput) {
+    out.writeUTF(executorId_)
     out.writeUTF(ip_)
     out.writeInt(port_)
   }
 
   override def readExternal(in: ObjectInput) {
+    executorId_ = in.readUTF()
     ip_ = in.readUTF()
     port_ = in.readInt()
   }
@@ -35,21 +40,23 @@ private[spark] class BlockManagerId private (
   @throws(classOf[IOException])
   private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
 
-  override def toString = "BlockManagerId(" + ip + ", " + port + ")"
+  override def toString = "BlockManagerId(%s, %s, %d)".format(executorId, ip, port)
 
-  override def hashCode = ip.hashCode * 41 + port
+  override def hashCode: Int = (executorId.hashCode * 41 + ip.hashCode) * 41 + port
 
   override def equals(that: Any) = that match {
-    case id: BlockManagerId => port == id.port && ip == id.ip
-    case _ => false
+    case id: BlockManagerId =>
+      executorId == id.executorId && port == id.port && ip == id.ip
+    case _ =>
+      false
   }
 }
 
 
 private[spark] object BlockManagerId {
 
-  def apply(ip: String, port: Int) =
-    getCachedBlockManagerId(new BlockManagerId(ip, port))
+  def apply(execId: String, ip: String, port: Int) =
+    getCachedBlockManagerId(new BlockManagerId(execId, ip, port))
 
   def apply(in: ObjectInput) = {
     val obj = new BlockManagerId()
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 9fd2b454a43dc9549e1f2c740e57657d0ffb2359..36398095a2e8ac97f9455f04d206a9b4fe9e31dd 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -1,6 +1,10 @@
 package spark.storage
 
-import scala.collection.mutable.ArrayBuffer
+import java.io._
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.util.Random
 
 import akka.actor.{Actor, ActorRef, ActorSystem, Props}
@@ -19,7 +23,7 @@ private[spark] class BlockManagerMaster(
     driverPort: Int)
   extends Logging {
 
-  val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
+  val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
   val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
 
   val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager"
@@ -40,10 +44,10 @@ private[spark] class BlockManagerMaster(
     }
   }
 
-  /** Remove a dead host from the driver actor. This is only called on the driver side. */
-  def notifyADeadHost(host: String) {
-    tell(RemoveHost(host))
-    logInfo("Removed " + host + " successfully in notifyADeadHost")
+  /** Remove a dead executor from the driver actor. This is only called on the driver side. */
+  def removeExecutor(execId: String) {
+    tell(RemoveExecutor(execId))
+    logInfo("Removed " + execId + " successfully in removeExecutor")
   }
 
   /**
@@ -141,7 +145,7 @@ private[spark] class BlockManagerMaster(
     }
     var attempts = 0
     var lastException: Exception = null
-    while (attempts < AKKA_RETRY_ATTEMPS) {
+    while (attempts < AKKA_RETRY_ATTEMPTS) {
       attempts += 1
       try {
         val future = driverActor.ask(message)(timeout)
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index f4d026da3329c801775275899a7f2dd94136f6dc..f88517f1a39ec9c9047013e1445ccab09b675a2b 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -23,9 +23,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
   private val blockManagerInfo =
     new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
 
-  // Mapping from host name to block manager id. We allow multiple block managers
-  // on the same host name (ip).
-  private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]]
+  // Mapping from executor ID to block manager ID.
+  private val blockManagerIdByExecutor = new HashMap[String, BlockManagerId]
 
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
@@ -68,11 +67,14 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     case GetMemoryStatus =>
       getMemoryStatus
 
+    case GetStorageStatus =>
+      getStorageStatus
+
     case RemoveBlock(blockId) =>
       removeBlock(blockId)
 
-    case RemoveHost(host) =>
-      removeHost(host)
+    case RemoveExecutor(execId) =>
+      removeExecutor(execId)
       sender ! true
 
     case StopBlockManagerMaster =>
@@ -96,16 +98,12 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
   def removeBlockManager(blockManagerId: BlockManagerId) {
     val info = blockManagerInfo(blockManagerId)
 
-    // Remove the block manager from blockManagerIdByHost. If the list of block
-    // managers belonging to the IP is empty, remove the entry from the hash map.
-    blockManagerIdByHost.get(blockManagerId.ip).foreach { managers: ArrayBuffer[BlockManagerId] =>
-      managers -= blockManagerId
-      if (managers.size == 0) blockManagerIdByHost.remove(blockManagerId.ip)
-    }
+    // Remove the block manager from blockManagerIdByExecutor.
+    blockManagerIdByExecutor -= blockManagerId.executorId
 
     // Remove it from blockManagerInfo and remove all the blocks.
     blockManagerInfo.remove(blockManagerId)
-    var iterator = info.blocks.keySet.iterator
+    val iterator = info.blocks.keySet.iterator
     while (iterator.hasNext) {
       val blockId = iterator.next
       val locations = blockLocations.get(blockId)._2
@@ -130,17 +128,15 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     toRemove.foreach(removeBlockManager)
   }
 
-  def removeHost(host: String) {
-    logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
-    logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
-    blockManagerIdByHost.get(host).foreach(_.foreach(removeBlockManager))
-    logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
+  def removeExecutor(execId: String) {
+    logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
+    blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
     sender ! true
   }
 
   def heartBeat(blockManagerId: BlockManagerId) {
     if (!blockManagerInfo.contains(blockManagerId)) {
-      if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+      if (blockManagerId.executorId == "<driver>" && !isLocal) {
         sender ! true
       } else {
         sender ! false
@@ -177,24 +173,28 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     sender ! res
   }
 
-  private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
-    val startTimeMs = System.currentTimeMillis()
-    val tmp = " " + blockManagerId + " "
+  private def getStorageStatus() {
+    val res = blockManagerInfo.map { case(blockManagerId, info) =>
+      import collection.JavaConverters._
+      StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap)
+    }
+    sender ! res
+  }
 
-    if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
-      logInfo("Got Register Msg from master node, don't register it")
-    } else {
-      blockManagerIdByHost.get(blockManagerId.ip) match {
-        case Some(managers) =>
-          // A block manager of the same host name already exists.
-          logInfo("Got another registration for host " + blockManagerId)
-          managers += blockManagerId
+  private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+    if (id.executorId == "<driver>" && !isLocal) {
+      // Got a register message from the master node; don't register it
+    } else if (!blockManagerInfo.contains(id)) {
+      blockManagerIdByExecutor.get(id.executorId) match {
+        case Some(manager) =>
+          // A block manager of the same host name already exists
+          logError("Got two different block manager registrations on " + id.executorId)
+          System.exit(1)
         case None =>
-          blockManagerIdByHost += (blockManagerId.ip -> ArrayBuffer(blockManagerId))
+          blockManagerIdByExecutor(id.executorId) = id
       }
-
-      blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
-        blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
+      blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
+        id, System.currentTimeMillis(), maxMemSize, slaveActor)
     }
     sender ! true
   }
@@ -206,11 +206,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
       memSize: Long,
       diskSize: Long) {
 
-    val startTimeMs = System.currentTimeMillis()
-    val tmp = " " + blockManagerId + " " + blockId + " "
-
     if (!blockManagerInfo.contains(blockManagerId)) {
-      if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+      if (blockManagerId.executorId == "<driver>" && !isLocal) {
         // We intentionally do not register the master (except in local mode),
         // so we should not indicate failure.
         sender ! true
@@ -342,8 +339,8 @@ object BlockManagerMasterActor {
       _lastSeenMs = System.currentTimeMillis()
     }
 
-    def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
-      : Unit = synchronized {
+    def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long,
+                        diskSize: Long) {
 
       updateLastSeenMs()
 
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index 30483b0b37e5c2c45875b1e77eaa8ef26d4e0dab..1494f901037c9d2ff74c8922dae4113f3c047cb8 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -88,7 +88,7 @@ private[spark]
 case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
 
 private[spark]
-case class RemoveHost(host: String) extends ToBlockManagerMaster
+case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
 
 private[spark]
 case object StopBlockManagerMaster extends ToBlockManagerMaster
@@ -98,3 +98,6 @@ case object GetMemoryStatus extends ToBlockManagerMaster
 
 private[spark]
 case object ExpireDeadHosts extends ToBlockManagerMaster
+
+private[spark]
+case object GetStorageStatus extends ToBlockManagerMaster
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
new file mode 100644
index 0000000000000000000000000000000000000000..eda320fa47972074228b01964962c63e73bbd8fa
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -0,0 +1,85 @@
+package spark.storage
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.pattern.ask
+import akka.util.Timeout
+import akka.util.duration._
+import cc.spray.directives._
+import cc.spray.typeconversion.TwirlSupport._
+import cc.spray.Directives
+import scala.collection.mutable.ArrayBuffer
+import spark.{Logging, SparkContext}
+import spark.util.AkkaUtils
+import spark.Utils
+
+
+/**
+ * Web UI server for the BlockManager inside each SparkContext.
+ */
+private[spark]
+class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
+  extends Directives with Logging {
+
+  val STATIC_RESOURCE_DIR = "spark/deploy/static"
+
+  implicit val timeout = Timeout(10 seconds)
+
+  /** Start a HTTP server to run the Web interface */
+  def start() {
+    try {
+      val port = if (System.getProperty("spark.ui.port") != null) {
+        System.getProperty("spark.ui.port").toInt
+      } else {
+        // TODO: Unfortunately, it's not possible to pass port 0 to spray and figure out which
+        // random port it bound to, so we have to try to find a local one by creating a socket.
+        Utils.findFreePort()
+      }
+      AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer")
+      logInfo("Started BlockManager web UI at http://%s:%d".format(Utils.localHostName(), port))
+    } catch {
+      case e: Exception =>
+        logError("Failed to create BlockManager WebUI", e)
+        System.exit(1)
+    }
+  }
+
+  val handler = {
+    get {
+      path("") {
+        completeWith {
+          // Request the current storage status from the Master
+          val future = blockManagerMaster ? GetStorageStatus
+          future.map { status =>
+            // Calculate macro-level statistics
+            val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
+            val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+            val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+            val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+              .reduceOption(_+_).getOrElse(0L)
+            val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+            spark.storage.html.index.
+              render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
+          }
+        }
+      } ~
+      path("rdd") {
+        parameter("id") { id =>
+          completeWith {
+            val future = blockManagerMaster ? GetStorageStatus
+            future.map { status =>
+              val prefix = "rdd_" + id.toString
+              val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
+              val filteredStorageStatusList = StorageUtils.
+                filterStorageStatusByPrefix(storageStatusList, prefix)
+              val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+              spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
+            }
+          }
+        }
+      } ~
+      pathPrefix("static") {
+        getFromResourceDirectory(STATIC_RESOURCE_DIR)
+      }
+    }
+  }
+}
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index d1d1c61c1cb37fb98f13460d23095536f197f24e..3b5a77ab228bb2df833cd04fa6e22e8ba93dd6dd 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -80,6 +80,14 @@ class StorageLevel private(
     "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
 
   override def hashCode(): Int = toInt * 41 + replication
+  def description : String = {
+    var result = ""
+    result += (if (useDisk) "Disk " else "")
+    result += (if (useMemory) "Memory " else "")
+    result += (if (deserialized) "Deserialized " else "Serialized")
+    result += "%sx Replicated".format(replication)
+    result
+  }
 }
 
 
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
new file mode 100644
index 0000000000000000000000000000000000000000..a10e3a95c60d3a0f4288adf0da42827276f12c18
--- /dev/null
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -0,0 +1,78 @@
+package spark.storage
+
+import spark.SparkContext
+import BlockManagerMasterActor.BlockStatus
+
+private[spark]
+case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, 
+  blocks: Map[String, BlockStatus]) {
+  
+  def memUsed(blockPrefix: String = "") = {
+    blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize).
+      reduceOption(_+_).getOrElse(0l)
+  }
+
+  def diskUsed(blockPrefix: String = "") = {
+    blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.diskSize).
+      reduceOption(_+_).getOrElse(0l)
+  }
+
+  def memRemaining : Long = maxMem - memUsed()
+
+}
+
+case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
+  numPartitions: Int, memSize: Long, diskSize: Long)
+
+
+/* Helper methods for storage-related objects */
+private[spark]
+object StorageUtils {
+
+  /* Given the current storage status of the BlockManager, returns information for each RDD */ 
+  def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus], 
+    sc: SparkContext) : Array[RDDInfo] = {
+    rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc) 
+  }
+
+  /* Given a list of BlockStatus objets, returns information for each RDD */ 
+  def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], 
+    sc: SparkContext) : Array[RDDInfo] = {
+    // Find all RDD Blocks (ignore broadcast variables)
+    val rddBlocks = infos.filterKeys(_.startsWith("rdd"))
+
+    // Group by rddId, ignore the partition name
+    val groupedRddBlocks = infos.groupBy { case(k, v) =>
+      k.substring(0,k.lastIndexOf('_'))
+    }.mapValues(_.values.toArray)
+
+    // For each RDD, generate an RDDInfo object
+    groupedRddBlocks.map { case(rddKey, rddBlocks) =>
+
+      // Add up memory and disk sizes
+      val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
+      val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _)
+
+      // Find the id of the RDD, e.g. rdd_1 => 1
+      val rddId = rddKey.split("_").last.toInt
+      // Get the friendly name for the rdd, if available.
+      val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey)
+      val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel
+      
+      RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize)
+    }.toArray
+  }
+
+  /* Removes all BlockStatus object that are not part of a block prefix */ 
+  def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus], 
+    prefix: String) : Array[StorageStatus] = {
+
+    storageStatusList.map { status =>
+      val newBlocks = status.blocks.filterKeys(_.startsWith(prefix))
+      //val newRemainingMem = status.maxMem - newBlocks.values.map(_.memSize).reduce(_ + _)
+      StorageStatus(status.blockManagerId, status.maxMem, newBlocks)
+    }
+
+  }
+
+}
\ No newline at end of file
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index 0b8f6d43031c4802787365c02c28cec905f3ca7e..a70d1c8e78e109523552f1b9cdb1a7b47d2200fd 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -78,7 +78,8 @@ private[spark] object ThreadingTest {
     val driverIp: String = System.getProperty("spark.driver.host", "localhost")
     val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
     val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, driverIp, driverPort)
-    val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
+    val blockManager = new BlockManager(
+      "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024)
     val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
     val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
     producers.foreach(_.start)
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index fbd0ff46bffe36fddf17a8056633c4cba46a6544..e0fdeffbc44d88a51ea43cb3a7c22ea5d594fc04 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -1,6 +1,6 @@
 package spark.util
 
-import akka.actor.{Props, ActorSystemImpl, ActorSystem}
+import akka.actor.{ActorRef, Props, ActorSystemImpl, ActorSystem}
 import com.typesafe.config.ConfigFactory
 import akka.util.duration._
 import akka.pattern.ask
@@ -52,21 +52,22 @@ private[spark] object AkkaUtils {
 
   /**
    * Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to
-   * handle requests. Throws a SparkException if this fails.
+   * handle requests. Returns the bound port or throws a SparkException on failure.
    */
-  def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) {
+  def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route, 
+      name: String = "HttpServer"): ActorRef = {
     val ioWorker = new IoWorker(actorSystem).start()
     val httpService = actorSystem.actorOf(Props(new HttpService(route)))
     val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
     val server = actorSystem.actorOf(
-      Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer")
+      Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = name)
     actorSystem.registerOnTermination { ioWorker.stop() }
     val timeout = 3.seconds
     val future = server.ask(HttpServer.Bind(ip, port))(timeout)
     try {
       Await.result(future, timeout) match {
         case bound: HttpServer.Bound =>
-          return
+          return server
         case other: Any =>
           throw new SparkException("Failed to bind web UI to port " + port + ": " + other)
       }
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index 139e21d09e00c6494c7b728967c50266b82d2e30..51fb440108c6568c963e75cbdf90bf8a8825e1c6 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -5,6 +5,9 @@ import java.util.{TimerTask, Timer}
 import spark.Logging
 
 
+/**
+ * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
+ */
 class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
 
   val delaySeconds = MetadataCleaner.getDelaySeconds
@@ -14,18 +17,16 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
   val task = new TimerTask {
     def run() {
       try {
-        if (delaySeconds > 0) {
-          cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
-          logInfo("Ran metadata cleaner for " + name)
-        }
+        cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
+        logInfo("Ran metadata cleaner for " + name)
       } catch {
         case e: Exception => logError("Error running cleanup task for " + name, e)
       }
     }
   }
 
-  if (periodSeconds > 0) {
-    logInfo(
+  if (delaySeconds > 0) {
+    logDebug(
       "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
       + "period of " + periodSeconds + " secs")
     timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
index bb7c5c01c80d710ec547e9adbf5c72001b713cfd..188f8910da8d54f1b43a5a8d24fa08348a6d95cb 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -63,9 +63,9 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
 
   override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
 
-  override def size(): Int = internalMap.size()
+  override def size: Int = internalMap.size
 
-  override def foreach[U](f: ((A, B)) => U): Unit = {
+  override def foreach[U](f: ((A, B)) => U) {
     val iterator = internalMap.entrySet().iterator()
     while(iterator.hasNext) {
       val entry = iterator.next()
diff --git a/core/src/main/twirl/spark/deploy/common/layout.scala.html b/core/src/main/twirl/spark/common/layout.scala.html
similarity index 100%
rename from core/src/main/twirl/spark/deploy/common/layout.scala.html
rename to core/src/main/twirl/spark/common/layout.scala.html
diff --git a/core/src/main/twirl/spark/deploy/master/index.scala.html b/core/src/main/twirl/spark/deploy/master/index.scala.html
index 18c32e5a1f094b35c8dd0107eeadffcd8f09a63a..285645c38989504920f0f049475c4788f80c037c 100644
--- a/core/src/main/twirl/spark/deploy/master/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/index.scala.html
@@ -2,7 +2,7 @@
 @import spark.deploy.master._
 @import spark.Utils
 
-@spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) {
+@spark.common.html.layout(title = "Spark Master on " + state.uri) {
       
   <!-- Cluster Details -->
   <div class="row">
diff --git a/core/src/main/twirl/spark/deploy/master/job_details.scala.html b/core/src/main/twirl/spark/deploy/master/job_details.scala.html
index dcf41c28f26f56fa289f7ef7a71c9067b47d1baf..d02a51b214180c26df713d583b23d9fc87db7872 100644
--- a/core/src/main/twirl/spark/deploy/master/job_details.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/job_details.scala.html
@@ -1,6 +1,6 @@
 @(job: spark.deploy.master.JobInfo)
 
-@spark.deploy.common.html.layout(title = "Job Details") {
+@spark.common.html.layout(title = "Job Details") {
       
   <!-- Job Details -->
   <div class="row">
diff --git a/core/src/main/twirl/spark/deploy/worker/index.scala.html b/core/src/main/twirl/spark/deploy/worker/index.scala.html
index b247307dab06793be7877bee00ffd1255bd4da8c..1d703dae58ccc3b5621f82e12c915648e38423f2 100644
--- a/core/src/main/twirl/spark/deploy/worker/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/worker/index.scala.html
@@ -1,8 +1,7 @@
 @(worker: spark.deploy.WorkerState)
-
 @import spark.Utils
 
-@spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) {
+@spark.common.html.layout(title = "Spark Worker on " + worker.uri) {
       
   <!-- Worker Details -->
   <div class="row">
diff --git a/core/src/main/twirl/spark/storage/index.scala.html b/core/src/main/twirl/spark/storage/index.scala.html
new file mode 100644
index 0000000000000000000000000000000000000000..2b337f61339b213bea1ac62941363708373e4c58
--- /dev/null
+++ b/core/src/main/twirl/spark/storage/index.scala.html
@@ -0,0 +1,40 @@
+@(maxMem: Long, remainingMem: Long, diskSpaceUsed: Long, rdds: Array[spark.storage.RDDInfo], storageStatusList: Array[spark.storage.StorageStatus])
+@import spark.Utils
+
+@spark.common.html.layout(title = "Storage Dashboard") {
+  
+  <!-- High-Level Information -->
+  <div class="row">
+    <div class="span12">
+      <ul class="unstyled">
+        <li><strong>Memory:</strong> 
+          @{Utils.memoryBytesToString(maxMem - remainingMem)} Used 
+          (@{Utils.memoryBytesToString(remainingMem)} Available) </li>
+        <li><strong>Disk:</strong> @{Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
+      </ul>
+    </div>
+  </div>
+
+  <hr/>
+
+  <!-- RDD Summary -->
+  <div class="row">
+    <div class="span12">
+      <h3> RDD Summary </h3>
+      <br/>
+       @rdd_table(rdds)
+    </div>
+  </div>
+
+  <hr/>
+
+  <!-- Worker Summary -->
+  <div class="row">
+    <div class="span12">
+      <h3> Worker Summary </h3>
+      <br/>
+       @worker_table(storageStatusList)
+    </div>
+  </div>
+
+}
\ No newline at end of file
diff --git a/core/src/main/twirl/spark/storage/rdd.scala.html b/core/src/main/twirl/spark/storage/rdd.scala.html
new file mode 100644
index 0000000000000000000000000000000000000000..ac7f8c981fa830f62c64acb30d64067eec1ab793
--- /dev/null
+++ b/core/src/main/twirl/spark/storage/rdd.scala.html
@@ -0,0 +1,77 @@
+@(rddInfo: spark.storage.RDDInfo, storageStatusList: Array[spark.storage.StorageStatus])
+@import spark.Utils
+
+@spark.common.html.layout(title = "RDD Info ") {
+  
+  <!-- High-Level Information -->
+  <div class="row">
+    <div class="span12">
+      <ul class="unstyled">
+        <li>
+          <strong>Storage Level:</strong> 
+          @(rddInfo.storageLevel.description)
+        <li>
+          <strong>Partitions:</strong>
+          @(rddInfo.numPartitions)
+        </li>
+        <li>
+          <strong>Memory Size:</strong>
+          @{Utils.memoryBytesToString(rddInfo.memSize)}
+        </li>
+        <li>
+          <strong>Disk Size:</strong>
+          @{Utils.memoryBytesToString(rddInfo.diskSize)}
+        </li>
+      </ul>
+    </div>
+  </div>
+
+  <hr/>
+
+  <!-- RDD Summary -->
+  <div class="row">
+    <div class="span12">
+      <h3> RDD Summary </h3>
+      <br/>
+      
+
+      <!-- Block Table Summary -->
+      <table class="table table-bordered table-striped table-condensed sortable">
+        <thead>
+          <tr>
+            <th>Block Name</th>
+            <th>Storage Level</th>
+            <th>Size in Memory</th>
+            <th>Size on Disk</th>
+          </tr>
+        </thead>
+        <tbody>
+          @storageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1).map { case (k,v) => 
+            <tr>
+              <td>@k</td>
+              <td>
+                 @(v.storageLevel.description)
+              </td>
+              <td>@{Utils.memoryBytesToString(v.memSize)}</td>
+              <td>@{Utils.memoryBytesToString(v.diskSize)}</td>
+            </tr>
+          }
+        </tbody>
+      </table>
+
+
+    </div>
+  </div>
+
+  <hr/>
+
+  <!-- Worker Table -->
+  <div class="row">
+    <div class="span12">
+      <h3> Worker Summary </h3>
+      <br/>
+       @worker_table(storageStatusList, "rdd_" + rddInfo.id )
+    </div>
+  </div>
+
+}
\ No newline at end of file
diff --git a/core/src/main/twirl/spark/storage/rdd_table.scala.html b/core/src/main/twirl/spark/storage/rdd_table.scala.html
new file mode 100644
index 0000000000000000000000000000000000000000..af801cf229569fb3110f599f64810063dc695156
--- /dev/null
+++ b/core/src/main/twirl/spark/storage/rdd_table.scala.html
@@ -0,0 +1,30 @@
+@(rdds: Array[spark.storage.RDDInfo])
+@import spark.Utils
+
+<table class="table table-bordered table-striped table-condensed sortable">
+  <thead>
+    <tr>
+      <th>RDD Name</th>
+      <th>Storage Level</th>
+      <th>Partitions</th>
+      <th>Size in Memory</th>
+      <th>Size on Disk</th>
+    </tr>
+  </thead>
+  <tbody>
+    @for(rdd <- rdds) {
+      <tr>
+        <td>
+          <a href="rdd?id=@(rdd.id)">
+            @rdd.name
+          </a>
+        </td>
+        <td>@(rdd.storageLevel.description)
+        </td>
+        <td>@rdd.numPartitions</td>
+        <td>@{Utils.memoryBytesToString(rdd.memSize)}</td>
+        <td>@{Utils.memoryBytesToString(rdd.diskSize)}</td>
+      </tr>
+    }
+  </tbody>
+</table>
\ No newline at end of file
diff --git a/core/src/main/twirl/spark/storage/worker_table.scala.html b/core/src/main/twirl/spark/storage/worker_table.scala.html
new file mode 100644
index 0000000000000000000000000000000000000000..d54b8de4cc81394149cceac0d52b965483e95a7c
--- /dev/null
+++ b/core/src/main/twirl/spark/storage/worker_table.scala.html
@@ -0,0 +1,24 @@
+@(workersStatusList: Array[spark.storage.StorageStatus], prefix: String = "")
+@import spark.Utils
+
+<table class="table table-bordered table-striped table-condensed sortable">
+  <thead>
+    <tr>
+      <th>Host</th>
+      <th>Memory Usage</th>
+      <th>Disk Usage</th>
+    </tr>
+  </thead>
+  <tbody>
+    @for(status <- workersStatusList) {
+      <tr>
+        <td>@(status.blockManagerId.ip + ":" + status.blockManagerId.port)</td>
+        <td>
+          @(Utils.memoryBytesToString(status.memUsed(prefix)))
+          (@(Utils.memoryBytesToString(status.memRemaining)) Total Available)
+        </td>
+        <td>@(Utils.memoryBytesToString(status.diskUsed(prefix)))</td>
+    </tr>
+    }
+  </tbody>
+</table>
\ No newline at end of file
diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/spark/DriverSuite.scala
index 70a7c8bc2f5282afb0b3ac6fa53618778eeba70b..342610e1dd93b3a9e856ce2f5e40d47169e8dced 100644
--- a/core/src/test/scala/spark/DriverSuite.scala
+++ b/core/src/test/scala/spark/DriverSuite.scala
@@ -13,7 +13,8 @@ class DriverSuite extends FunSuite with Timeouts {
     val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
     forAll(masters) { (master: String) =>
       failAfter(10 seconds) {
-        Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master), new File(System.getenv("SPARK_HOME")))
+        Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master),
+          new File(System.getenv("SPARK_HOME")))
       }
     }
   }
@@ -28,4 +29,4 @@ object DriverWithoutCleanup {
     val sc = new SparkContext(args(0), "DriverWithoutCleanup")
     sc.parallelize(1 to 100, 4).count()
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 42ce6f3c749f90b57060d97d13528572410a8941..934e4c2f6793bda90335d561b3792080a8490f38 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -355,6 +355,34 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(11, pairs.count());
   }
 
+  @Test
+  public void mapsFromPairsToPairs() {
+      List<Tuple2<Integer, String>> pairs = Arrays.asList(
+              new Tuple2<Integer, String>(1, "a"),
+              new Tuple2<Integer, String>(2, "aa"),
+              new Tuple2<Integer, String>(3, "aaa")
+      );
+      JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+      // Regression test for SPARK-668:
+      JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
+          new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
+          @Override
+          public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
+              return Collections.singletonList(item.swap());
+          }
+      });
+      swapped.collect();
+
+      // There was never a bug here, but it's worth testing:
+      pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
+          @Override
+          public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
+              return item.swap();
+          }
+      }).collect();
+  }
+
   @Test
   public void mapPartitions() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 718107d2b53f107b3476eb35a3c774422c546726..f4e7ec39fe301b4aa1b1a4e674ec95d73363cb85 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -43,13 +43,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     val compressedSize10000 = MapOutputTracker.compressSize(10000L)
     val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
     val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
-    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("hostA", 1000),
+    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
         Array(compressedSize1000, compressedSize10000)))
-    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("hostB", 1000),
+    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
         Array(compressedSize10000, compressedSize1000)))
     val statuses = tracker.getServerStatuses(10, 0)
-    assert(statuses.toSeq === Seq((BlockManagerId("hostA", 1000), size1000),
-                                  (BlockManagerId("hostB", 1000), size10000)))
+    assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
+                                  (BlockManagerId("b", "hostB", 1000), size10000)))
     tracker.stop()
   }
 
@@ -61,47 +61,52 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     val compressedSize10000 = MapOutputTracker.compressSize(10000L)
     val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
     val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
-    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("hostA", 1000),
+    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
         Array(compressedSize1000, compressedSize1000, compressedSize1000)))
-    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("hostB", 1000),
+    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
         Array(compressedSize10000, compressedSize1000, compressedSize1000)))
 
     // As if we had two simulatenous fetch failures
-    tracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
-    tracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
+    tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
+    tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
 
-    // The remaining reduce task might try to grab the output dispite the shuffle failure;
+    // The remaining reduce task might try to grab the output despite the shuffle failure;
     // this should cause it to fail, and the scheduler will ignore the failure due to the
     // stage already being aborted.
     intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
   }
 
   test("remote fetch") {
-    val (actorSystem, boundPort) =
-      AkkaUtils.createActorSystem("test", "localhost", 0)
-    System.setProperty("spark.driver.port", boundPort.toString)
-    val masterTracker = new MapOutputTracker(actorSystem, true)
-    val slaveTracker = new MapOutputTracker(actorSystem, false)
-    masterTracker.registerShuffle(10, 1)
-    masterTracker.incrementGeneration()
-    slaveTracker.updateGeneration(masterTracker.getGeneration)
-    intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+    try {
+      System.clearProperty("spark.driver.host")  // In case some previous test had set it
+      val (actorSystem, boundPort) =
+        AkkaUtils.createActorSystem("test", "localhost", 0)
+      System.setProperty("spark.driver.port", boundPort.toString)
+      val masterTracker = new MapOutputTracker(actorSystem, true)
+      val slaveTracker = new MapOutputTracker(actorSystem, false)
+      masterTracker.registerShuffle(10, 1)
+      masterTracker.incrementGeneration()
+      slaveTracker.updateGeneration(masterTracker.getGeneration)
+      intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
 
-    val compressedSize1000 = MapOutputTracker.compressSize(1000L)
-    val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
-    masterTracker.registerMapOutput(10, 0, new MapStatus(
-      BlockManagerId("hostA", 1000), Array(compressedSize1000)))
-    masterTracker.incrementGeneration()
-    slaveTracker.updateGeneration(masterTracker.getGeneration)
-    assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
-           Seq((BlockManagerId("hostA", 1000), size1000)))
+      val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+      val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+      masterTracker.registerMapOutput(10, 0, new MapStatus(
+        BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+      masterTracker.incrementGeneration()
+      slaveTracker.updateGeneration(masterTracker.getGeneration)
+      assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+             Seq((BlockManagerId("a", "hostA", 1000), size1000)))
 
-    masterTracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
-    masterTracker.incrementGeneration()
-    slaveTracker.updateGeneration(masterTracker.getGeneration)
-    intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+      masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
+      masterTracker.incrementGeneration()
+      slaveTracker.updateGeneration(masterTracker.getGeneration)
+      intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
 
-    // failure should be cached
-    intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+      // failure should be cached
+      intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+    } finally {
+      System.clearProperty("spark.driver.port")
+    }
   }
 }
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index a1aeb12f25ec91d1f91f267dcb0aebc1cee47496..2d177bbf6745891fb8995be18c865de7ca5bc85e 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -86,9 +86,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("BlockManagerId object caching") {
-    val id1 = BlockManagerId("XXX", 1)
-    val id2 = BlockManagerId("XXX", 1) // this should return the same object as id1
-    val id3 = BlockManagerId("XXX", 2) // this should return a different object
+    val id1 = BlockManagerId("e1", "XXX", 1)
+    val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1
+    val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object
     assert(id2 === id1, "id2 is not same as id1")
     assert(id2.eq(id1), "id2 is not the same object as id1")
     assert(id3 != id1, "id3 is same as id1")
@@ -103,7 +103,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("master + 1 manager interaction") {
-    store = new BlockManager(actorSystem, master, serializer, 2000)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -133,8 +133,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("master + 2 managers interaction") {
-    store = new BlockManager(actorSystem, master, serializer, 2000)
-    store2 = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
+    store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
+    store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000)
 
     val peers = master.getPeers(store.blockManagerId, 1)
     assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -149,7 +149,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("removing block") {
-    store = new BlockManager(actorSystem, master, serializer, 2000)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -198,7 +198,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
 
   test("reregistration on heart beat") {
     val heartBeat = PrivateMethod[Unit]('heartBeat)
-    store = new BlockManager(actorSystem, master, serializer, 2000)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
     val a1 = new Array[Byte](400)
 
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -206,7 +206,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(store.getSingle("a1") != None, "a1 was not in store")
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
-    master.notifyADeadHost(store.blockManagerId.ip)
+    master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
     store invokePrivate heartBeat()
@@ -214,25 +214,63 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("reregistration on block update") {
-    store = new BlockManager(actorSystem, master, serializer, 2000)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
-
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
-    master.notifyADeadHost(store.blockManagerId.ip)
+    master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
     store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+    store.waitForAsyncReregister()
 
     assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
     assert(master.getLocations("a2").size > 0, "master was not told about a2")
   }
 
+  test("reregistration doesn't dead lock") {
+    val heartBeat = PrivateMethod[Unit]('heartBeat)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+    val a2 = List(new Array[Byte](400))
+
+    // try many times to trigger any deadlocks
+    for (i <- 1 to 100) {
+      master.removeExecutor(store.blockManagerId.executorId)
+      val t1 = new Thread {
+        override def run() {
+          store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, true)
+        }
+      }
+      val t2 = new Thread {
+        override def run() {
+          store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+        }
+      }
+      val t3 = new Thread {
+        override def run() {
+          store invokePrivate heartBeat()
+        }
+      }
+
+      t1.start()
+      t2.start()
+      t3.start()
+      t1.join()
+      t2.join()
+      t3.join()
+ 
+      store.dropFromMemory("a1", null)
+      store.dropFromMemory("a2", null)
+      store.waitForAsyncReregister()
+    }
+  }
+
   test("in-memory LRU storage") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -251,7 +289,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU storage with serialization") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -270,14 +308,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU for partitions of same RDD") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
     store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
-    // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2
+    // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
     // from the same RDD
     assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
     assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
@@ -289,7 +327,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU for partitions of multiple RDDs") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -312,7 +350,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("on-disk storage") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -325,7 +363,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -340,7 +378,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage with getLocalBytes") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -355,7 +393,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage with serialization") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -370,7 +408,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("disk and memory storage with serialization and getLocalBytes") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -385,7 +423,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("LRU with mixed storage levels") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
     val a3 = new Array[Byte](400)
@@ -410,7 +448,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("in-memory LRU with streams") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val list1 = List(new Array[Byte](200), new Array[Byte](200))
     val list2 = List(new Array[Byte](200), new Array[Byte](200))
     val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -434,7 +472,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("LRU with mixed storage levels and streams") {
-    store = new BlockManager(actorSystem, master, serializer, 1200)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
     val list1 = List(new Array[Byte](200), new Array[Byte](200))
     val list2 = List(new Array[Byte](200), new Array[Byte](200))
     val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -480,7 +518,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   }
 
   test("overly large block") {
-    store = new BlockManager(actorSystem, master, serializer, 500)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 500)
     store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
     assert(store.getSingle("a1") === None, "a1 was in store")
     store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -491,49 +529,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   test("block compression") {
     try {
       System.setProperty("spark.shuffle.compress", "true")
-      store = new BlockManager(actorSystem, master, serializer, 2000)
+      store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
       store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.shuffle.compress", "false")
-      store = new BlockManager(actorSystem, master, serializer, 2000)
+      store = new BlockManager("exec2", actorSystem, master, serializer, 2000)
       store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.broadcast.compress", "true")
-      store = new BlockManager(actorSystem, master, serializer, 2000)
+      store = new BlockManager("exec3", actorSystem, master, serializer, 2000)
       store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.broadcast.compress", "false")
-      store = new BlockManager(actorSystem, master, serializer, 2000)
+      store = new BlockManager("exec4", actorSystem, master, serializer, 2000)
       store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.rdd.compress", "true")
-      store = new BlockManager(actorSystem, master, serializer, 2000)
+      store = new BlockManager("exec5", actorSystem, master, serializer, 2000)
       store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
       store.stop()
       store = null
 
       System.setProperty("spark.rdd.compress", "false")
-      store = new BlockManager(actorSystem, master, serializer, 2000)
+      store = new BlockManager("exec6", actorSystem, master, serializer, 2000)
       store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
       store.stop()
       store = null
 
       // Check that any other block types are also kept uncompressed
-      store = new BlockManager(actorSystem, master, serializer, 2000)
+      store = new BlockManager("exec7", actorSystem, master, serializer, 2000)
       store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
       assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
       store.stop()
diff --git a/sbt/sbt b/sbt/sbt
index a3055c13c1b0fb420f901122e59f4a2cf01f3ecd..8f426d18e892facbc84fe1fe47edc8bc3a0f24ea 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -5,4 +5,4 @@ if [ "$MESOS_HOME" != "" ]; then
 fi
 export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd)
 export SPARK_TESTING=1  # To put test classes on classpath
-java -Xmx1200M -XX:MaxPermSize=200m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
+java -Xmx1200M -XX:MaxPermSize=250m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"