diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index d48c1892aea9c20b53312b501f3731eeeefacec3..f4eb1601be3e4409f3ac859992ca61a03ce906de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -30,20 +30,24 @@ import scala.sys.process._
 import org.json4s._
 import org.json4s.jackson.JsonMethods
 
-import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.deploy.master.RecoveryState
+import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
 
 /**
  * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
  * In order to mimic a real distributed cluster more closely, Docker is used.
  * Execute using
- * ./spark-class org.apache.spark.deploy.FaultToleranceTest
+ * ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest
  *
- * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS:
+ * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
+ * *and* SPARK_JAVA_OPTS:
  *   - spark.deploy.recoveryMode=ZOOKEEPER
  *   - spark.deploy.zookeeper.url=172.17.42.1:2181
  * Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port.
  *
+ * In case of failure, make sure to kill off prior docker containers before restarting:
+ *   docker kill $(docker ps -q)
+ *
  * Unfortunately, due to the Docker dependency this suite cannot be run automatically without a
  * working installation of Docker. In addition to having Docker, the following are assumed:
  *   - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/)
@@ -51,10 +55,16 @@ import org.apache.spark.deploy.master.RecoveryState
  *     docker/ directory. Run 'docker/spark-test/build' to generate these.
  */
 private[spark] object FaultToleranceTest extends App with Logging {
+
+  val conf = new SparkConf()
+  val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
+
   val masters = ListBuffer[TestMasterInfo]()
   val workers = ListBuffer[TestWorkerInfo]()
   var sc: SparkContext = _
 
+  val zk =  SparkCuratorUtil.newClient(conf)
+
   var numPassed = 0
   var numFailed = 0
 
@@ -72,6 +82,10 @@ private[spark] object FaultToleranceTest extends App with Logging {
       sc = null
     }
     terminateCluster()
+
+    // Clear ZK directories in between tests (for speed purposes)
+    SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
+    SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
   }
 
   test("sanity-basic") {
@@ -168,26 +182,34 @@ private[spark] object FaultToleranceTest extends App with Logging {
     try {
       fn
       numPassed += 1
+      logInfo("==============================================")
       logInfo("Passed: " + name)
+      logInfo("==============================================")
     } catch {
       case e: Exception =>
         numFailed += 1
+        logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
         logError("FAILED: " + name, e)
+        logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
+        sys.exit(1)
     }
     afterEach()
   }
 
   def addMasters(num: Int) {
+    logInfo(s">>>>> ADD MASTERS $num <<<<<")
     (1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
   }
 
   def addWorkers(num: Int) {
+    logInfo(s">>>>> ADD WORKERS $num <<<<<")
     val masterUrls = getMasterUrls(masters)
     (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
   }
 
   /** Creates a SparkContext, which constructs a Client to interact with our cluster. */
   def createClient() = {
+    logInfo(">>>>> CREATE CLIENT <<<<<")
     if (sc != null) { sc.stop() }
     // Counter-hack: Because of a hack in SparkEnv#create() that changes this
     // property, we need to reset it.
@@ -206,6 +228,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
   }
 
   def killLeader(): Unit = {
+    logInfo(">>>>> KILL LEADER <<<<<")
     masters.foreach(_.readState())
     val leader = getLeader
     masters -= leader
@@ -215,6 +238,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
   def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)
 
   def terminateCluster() {
+    logInfo(">>>>> TERMINATE CLUSTER <<<<<")
     masters.foreach(_.kill())
     workers.foreach(_.kill())
     masters.clear()
@@ -245,6 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
    * are all alive in a proper configuration (e.g., only one leader).
    */
   def assertValidClusterState() = {
+    logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<")
     assertUsable()
     var numAlive = 0
     var numStandby = 0
@@ -326,7 +351,11 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val
 
       val workers = json \ "workers"
       val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE")
-      liveWorkerIPs = liveWorkers.map(w => (w \ "host").extract[String])
+      // Extract the worker IP from "webuiaddress" (rather than "host") because the host name
+      // on containers is a weird hash instead of the actual IP address.
+      liveWorkerIPs = liveWorkers.map {
+        w => (w \ "webuiaddress").extract[String].stripPrefix("http://").stripSuffix(":8081")
+      }
 
       numLiveApps = (json \ "activeapps").children.size
 
@@ -403,7 +432,7 @@ private[spark] object Docker extends Logging {
   def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
     val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""
 
-    val cmd = "docker run %s %s %s".format(mountCmd, imageTag, args)
+    val cmd = "docker run -privileged %s %s %s".format(mountCmd, imageTag, args)
     logDebug("Run command: " + cmd)
     cmd
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2d6d0c33fac7ebf91bf91698092af23a82135b76..b8dfa441025834923a832e9886b9a762ad7796e9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -531,8 +531,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
 
     val workerAddress = worker.actor.path.address
     if (addressToWorker.contains(workerAddress)) {
-      logInfo("Attempted to re-register worker at same address: " + workerAddress)
-      return false
+      val oldWorker = addressToWorker(workerAddress)
+      if (oldWorker.state == WorkerState.UNKNOWN) {
+        // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
+        // The old worker must thus be dead, so we will remove it and accept the new worker.
+        removeWorker(oldWorker)
+      } else {
+        logInfo("Attempted to re-register worker at same address: " + workerAddress)
+        return false
+      }
     }
 
     workers += worker
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
index 2d35397035a03a25014745bcd79e5e1e4b9ae56d..4781a80d470e1a11b173137f955d11b2bfa535a2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.deploy.master
 
-import org.apache.spark.{SparkConf, Logging}
+import scala.collection.JavaConversions._
+
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.zookeeper.KeeperException
 
+import org.apache.spark.{Logging, SparkConf}
 
 object SparkCuratorUtil extends Logging {
 
@@ -50,4 +52,13 @@ object SparkCuratorUtil extends Logging {
       }
     }
   }
+
+  def deleteRecursive(zk: CuratorFramework, path: String) {
+    if (zk.checkExists().forPath(path) != null) {
+      for (child <- zk.getChildren.forPath(path)) {
+        zk.delete().forPath(path + "/" + child)
+      }
+      zk.delete().forPath(path)
+    }
+  }
 }
diff --git a/docker/README.md b/docker/README.md
index bf59e77d111f9c121ea64b949dba3a557a38622d..40ba9c3065946a6db2bb4fffdd54a3de454c1da1 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -2,4 +2,6 @@ Spark docker files
 ===========
 
 Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles),
-as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
\ No newline at end of file
+as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
+
+Tested with Docker version 0.8.1.
diff --git a/docker/spark-test/master/default_cmd b/docker/spark-test/master/default_cmd
index a5b1303c2ebdb957d3f2249fd13a13f537582599..5a7da3446f6d2f784a236f0e06683fdbd26029b7 100755
--- a/docker/spark-test/master/default_cmd
+++ b/docker/spark-test/master/default_cmd
@@ -19,4 +19,10 @@
 
 IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
 echo "CONTAINER_IP=$IP"
-/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP
+export SPARK_LOCAL_IP=$IP
+export SPARK_PUBLIC_DNS=$IP
+
+# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
+umount /etc/hosts
+
+/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP
diff --git a/docker/spark-test/worker/default_cmd b/docker/spark-test/worker/default_cmd
index ab6336f70c1c6a3913a8f217d0619514ac287752..31b06cb0eb0479231d04f5c8ced60612bb295a7a 100755
--- a/docker/spark-test/worker/default_cmd
+++ b/docker/spark-test/worker/default_cmd
@@ -19,4 +19,10 @@
 
 IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
 echo "CONTAINER_IP=$IP"
-/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1
+export SPARK_LOCAL_IP=$IP
+export SPARK_PUBLIC_DNS=$IP
+
+# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
+umount /etc/hosts
+
+/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1