diff --git a/bin/stop-slaves.sh b/bin/stop-slaves.sh
index 03e416a13274d5c94caa381257332ec42bb31fd1..abf1c7be6517d5ecbe597c529f5cee88ee306120 100755
--- a/bin/stop-slaves.sh
+++ b/bin/stop-slaves.sh
@@ -17,7 +17,7 @@
 # limitations under the License.
 #
 
-# Starts the master on the machine this script is executed on.
+# Starts workers on the machine this script is executed on.
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 1cfff5e565d173c730e65ff301b26b59d7c31085..0d0745a480c4a50a13437818227bb5a4941d4c73 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -52,6 +52,8 @@ private[deploy] object DeployMessages {
       exitStatus: Option[Int])
     extends DeployMessage
 
+  case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription])
+
   case class Heartbeat(workerId: String) extends DeployMessage
 
   // Master to Worker
@@ -76,6 +78,8 @@ private[deploy] object DeployMessages {
   case class RegisterApplication(appDescription: ApplicationDescription)
     extends DeployMessage
 
+  case class MasterChangeAcknowledged(appId: String)
+
   // Master to Client
 
   case class RegisteredApplication(appId: String) extends DeployMessage
@@ -94,6 +98,10 @@ private[deploy] object DeployMessages {
 
   case object StopClient
 
+  // Master to Worker & Client
+
+  case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
+
   // MasterWebUI To Master
 
   case object RequestMasterState
@@ -127,6 +135,10 @@ private[deploy] object DeployMessages {
 
   case object CheckForWorkerTimeOut
 
+  case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo])
+
+  case object EndRecoveryProcess
+
   case object RequestWebUIPort
 
   case class WebUIPortResponse(webUIBoundPort: Int)
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala
new file mode 100644
index 0000000000000000000000000000000000000000..716ee483d5cf32031d07d9a558fa855ba5bed23f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+private[spark] class ExecutorDescription(
+    val appId: String,
+    val execId: Int,
+    val cores: Int,
+    val state: ExecutorState.Value)
+  extends Serializable {
+
+  override def toString: String =
+    "ExecutorState(appId=%s, execId=%d, cores=%d, state=%s)".format(appId, execId, cores, state)
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index a342dd724a8ed74bbc1b80bc5d2eb309ded1e7c6..28548a2ca913bf09c7c56a1719929fd7556602e6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -92,20 +92,25 @@ private[spark] class Client(
           listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
         }
 
+      case MasterChanged(materUrl, masterWebUiUrl) =>
+        logInfo("Master has changed, new master is at " + masterUrl)
+        context.unwatch(master)
+        master = context.actorFor(Master.toAkkaUrl(masterUrl))
+        masterAddress = master.path.address
+        sender ! MasterChangeAcknowledged(appId)
+        context.watch(master)
+
       case Terminated(actor_) if actor_ == master =>
-        logError("Connection to master failed; stopping client")
+        logError("Connection to master failed; waiting for master to reconnect...")
         markDisconnected()
-        context.stop(self)
 
       case RemoteClientDisconnected(transport, address) if address == masterAddress =>
-        logError("Connection to master failed; stopping client")
+        logError("Connection to master failed; waiting for master to reconnect...")
         markDisconnected()
-        context.stop(self)
 
       case RemoteClientShutdown(transport, address) if address == masterAddress =>
-        logError("Connection to master failed; stopping client")
+        logError("Connection to master failed; waiting for master to reconnect...")
         markDisconnected()
-        context.stop(self)
 
       case StopClient =>
         markDisconnected()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index bd5327627a832fc3b28202920f889bbd421d7ec5..e437a0e7ae5b023307b70973064d7e9fdcdbe611 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -23,29 +23,52 @@ import akka.actor.ActorRef
 import scala.collection.mutable
 
 private[spark] class ApplicationInfo(
-    val startTime: Long,
-    val id: String,
-    val desc: ApplicationDescription,
-    val submitDate: Date,
-    val driver: ActorRef,
-    val appUiUrl: String)
-{
-  var state = ApplicationState.WAITING
-  var executors = new mutable.HashMap[Int, ExecutorInfo]
-  var coresGranted = 0
-  var endTime = -1L
-  val appSource = new ApplicationSource(this)
-
-  private var nextExecutorId = 0
-
-  def newExecutorId(): Int = {
-    val id = nextExecutorId
-    nextExecutorId += 1
-    id
+  val startTime: Long,
+  val id: String,
+  val desc: ApplicationDescription,
+  val submitDate: Date,
+  val driver: ActorRef,
+  val appUiUrl: String)
+  extends Serializable {
+
+  @transient var state: ApplicationState.Value = _
+  @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
+  @transient var coresGranted: Int = _
+  @transient var endTime: Long = _
+  @transient var appSource: ApplicationSource = _
+
+  @transient private var nextExecutorId: Int = _
+
+  init()
+
+  private def readObject(in: java.io.ObjectInputStream) : Unit = {
+    in.defaultReadObject()
+    init()
+  }
+
+  private def init() {
+    state = ApplicationState.WAITING
+    executors = new mutable.HashMap[Int, ExecutorInfo]
+    coresGranted = 0
+    endTime = -1L
+    appSource = new ApplicationSource(this)
+    nextExecutorId = 0
+  }
+
+  private def newExecutorId(useID: Option[Int] = None): Int = {
+    useID match {
+      case Some(id) =>
+        nextExecutorId = math.max(nextExecutorId, id + 1)
+        id
+      case None =>
+        val id = nextExecutorId
+        nextExecutorId += 1
+        id
+    }
   }
 
-  def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
-    val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
+  def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
+    val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
     executors(exec.id) = exec
     coresGranted += cores
     exec
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
index 7e804223cf48a6459a7c60caa50cff33e7675d89..fedf879eff73bf1abaae089a84c1a8a5efe00c89 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
@@ -18,11 +18,11 @@
 package org.apache.spark.deploy.master
 
 private[spark] object ApplicationState
-  extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
+  extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED", "UNKNOWN") {
 
   type ApplicationState = Value
 
-  val WAITING, RUNNING, FINISHED, FAILED = Value
+  val WAITING, RUNNING, FINISHED, FAILED, UNKNOWN = Value
 
   val MAX_NUM_RETRY = 10
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
index cf384a985e90ede61afa0eb7267d636af0eada5e..d235234c13b36cadcc13c053b7855ee9b3fe0a0e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.master
 
-import org.apache.spark.deploy.ExecutorState
+import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
 
 private[spark] class ExecutorInfo(
     val id: Int,
@@ -28,5 +28,10 @@ private[spark] class ExecutorInfo(
 
   var state = ExecutorState.LAUNCHING
 
+  /** Copy all state variables from the given on-the-wire ExecutorDescription. */
+  def copyState(execDesc: ExecutorDescription) {
+    state = execDesc.state
+  }
+
   def fullId: String = application.id + "/" + id
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
new file mode 100644
index 0000000000000000000000000000000000000000..2fc13821bde2ce9b5e04c75a6db15968ae5f2645
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.io._
+
+import scala.Serializable
+
+import akka.serialization.Serialization
+import org.apache.spark.Logging
+
+/**
+ * Stores data in a single on-disk directory with one file per application and worker.
+ * Files are deleted when applications and workers are removed.
+ *
+ * @param dir Directory to store files. Created if non-existent (but not recursively).
+ * @param serialization Used to serialize our objects.
+ */
+private[spark] class FileSystemPersistenceEngine(
+  val dir: String,
+  val serialization: Serialization)
+  extends PersistenceEngine with Logging {
+
+  new File(dir).mkdir()
+
+  override def addApplication(app: ApplicationInfo) {
+    val appFile = new File(dir + File.separator + "app_" + app.id)
+    serializeIntoFile(appFile, app)
+  }
+
+  override def removeApplication(app: ApplicationInfo) {
+    new File(dir + File.separator + "app_" + app.id).delete()
+  }
+
+  override def addWorker(worker: WorkerInfo) {
+    val workerFile = new File(dir + File.separator + "worker_" + worker.id)
+    serializeIntoFile(workerFile, worker)
+  }
+
+  override def removeWorker(worker: WorkerInfo) {
+    new File(dir + File.separator + "worker_" + worker.id).delete()
+  }
+
+  override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+    val sortedFiles = new File(dir).listFiles().sortBy(_.getName())
+    val appFiles = sortedFiles.filter(_.getName().startsWith("app_"))
+    val apps = appFiles.map(deserializeFromFile[ApplicationInfo](_))
+    val workerFiles = sortedFiles.filter(_.getName().startsWith("worker_"))
+    val workers = workerFiles.map(deserializeFromFile[WorkerInfo](_))
+    (apps, workers)
+  }
+
+  private def serializeIntoFile(file: File, value: Serializable) {
+    val created = file.createNewFile()
+    if (!created) { throw new IllegalStateException("Could not create file: " + file) }
+
+    val serializer = serialization.findSerializerFor(value)
+    val serialized = serializer.toBinary(value)
+
+    val out = new FileOutputStream(file)
+    out.write(serialized)
+    out.close()
+  }
+
+  def deserializeFromFile[T <: Serializable](file: File)(implicit m: Manifest[T]): T = {
+    val fileData = new Array[Byte](file.length().asInstanceOf[Int])
+    val dis = new DataInputStream(new FileInputStream(file))
+    dis.readFully(fileData)
+    dis.close()
+
+    val clazz = m.erasure.asInstanceOf[Class[T]]
+    val serializer = serialization.serializerFor(clazz)
+    serializer.fromBinary(fileData).asInstanceOf[T]
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index bde59905bc7646ab20170500c331cf77ecdf294a..c6e039eed43374f2a2234d0dcac20c439c839bbd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -27,16 +27,17 @@ import akka.actor.Terminated
 import akka.dispatch.Await
 import akka.pattern.ask
 import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.serialization.SerializationExtension
 import akka.util.duration._
-import akka.util.Timeout
+import akka.util.{Duration, Timeout}
 
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.MasterState.MasterState
 import org.apache.spark.deploy.master.ui.MasterWebUI
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
-import akka.util.{Duration, Timeout}
+import org.apache.spark.util.{AkkaUtils, Utils}
 
 
 private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@@ -44,7 +45,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
   val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
   val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
- 
+  val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
+
   var nextAppNumber = 0
   val workers = new HashSet[WorkerInfo]
   val idToWorker = new HashMap[String, WorkerInfo]
@@ -74,6 +76,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     if (envVar != null) envVar else host
   }
 
+  var state: MasterState = _
+
+  var persistenceEngine: PersistenceEngine = _
+
   // As a temporary workaround before better ways of configuring memory, we allow users to set
   // a flag that will perform round-robin scheduling across the nodes (spreading out each app
   // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
@@ -89,6 +95,23 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     masterMetricsSystem.registerSource(masterSource)
     masterMetricsSystem.start()
     applicationMetricsSystem.start()
+
+    persistenceEngine =
+      if (RECOVERY_DIR.isEmpty()) {
+        new BlackHolePersistenceEngine()
+      } else {
+        logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
+        new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
+      }
+
+    val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
+    state =
+      if (storedApps.isEmpty && storedWorkers.isEmpty) {
+        MasterState.ALIVE
+      } else {
+        self ! BeginRecovery(storedApps, storedWorkers)
+        MasterState.RECOVERING
+      }
   }
 
   override def postStop() {
@@ -98,14 +121,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   }
 
   override def receive = {
-    case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
+    case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
       logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
         host, workerPort, cores, Utils.megabytesToString(memory)))
       if (idToWorker.contains(id)) {
         sender ! RegisterWorkerFailed("Duplicate worker ID")
       } else {
-        addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
+        val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+        registerWorker(worker)
         context.watch(sender)  // This doesn't work with remote actors but helps for testing
+        persistenceEngine.addWorker(worker)
         sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
         schedule()
       }
@@ -113,10 +138,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
     case RegisterApplication(description) => {
       logInfo("Registering app " + description.name)
-      val app = addApplication(description, sender)
+      val app = createApplication(description, sender)
+      registerApplication(app)
       logInfo("Registered app " + description.name + " with ID " + app.id)
-      waitingApps += app
       context.watch(sender)  // This doesn't work with remote actors but helps for testing
+      persistenceEngine.addApplication(app)
       sender ! RegisteredApplication(app.id)
       schedule()
     }
@@ -158,23 +184,78 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       }
     }
 
+    case BeginRecovery(storedApps, storedWorkers) => {
+      context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, EndRecoveryProcess)
+
+      val masterUrl = "spark://" + host + ":" + port
+      val masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
+      for (app <- storedApps) {
+        registerApplication(app)
+        app.state = ApplicationState.UNKNOWN
+        app.driver ! MasterChanged(masterUrl, masterWebUiUrl)
+      }
+      for (worker <- storedWorkers) {
+        registerWorker(worker)
+        worker.state = WorkerState.UNKNOWN
+        worker.actor ! MasterChanged(masterUrl, masterWebUiUrl)
+      }
+    }
+
+    case MasterChangeAcknowledged(appId) => {
+      val appOption = idToApp.get(appId)
+      appOption match {
+        case Some(app) =>
+          app.state = ApplicationState.WAITING
+        case None =>
+          logWarning("Master change ack from unknown app: " + appId)
+      }
+
+      if (canCompleteRecovery) { completeRecovery() }
+    }
+
+    case WorkerSchedulerStateResponse(workerId, executors) => {
+      idToWorker.get(workerId) match {
+        case Some(worker) =>
+          worker.state = WorkerState.ALIVE
+
+          val validExecutors = executors.filter(exec => idToApp.get(exec.appId) != None)
+          for (exec <- validExecutors) {
+            val app = idToApp.get(exec.appId).get
+            val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
+            worker.addExecutor(execInfo)
+            execInfo.copyState(exec)
+          }
+        case None =>
+          logWarning("Scheduler state from unknown worker: " + workerId)
+      }
+
+      if (canCompleteRecovery) { completeRecovery() }
+    }
+
+    case EndRecoveryProcess => {
+      completeRecovery()
+    }
+
     case Terminated(actor) => {
       // The disconnected actor could've been either a worker or an app; remove whichever of
       // those we have an entry for in the corresponding actor hashmap
       actorToWorker.get(actor).foreach(removeWorker)
       actorToApp.get(actor).foreach(finishApplication)
+      if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
     }
 
     case RemoteClientDisconnected(transport, address) => {
       // The disconnected client could've been either a worker or an app; remove whichever it was
       addressToWorker.get(address).foreach(removeWorker)
       addressToApp.get(address).foreach(finishApplication)
+      if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
     }
 
     case RemoteClientShutdown(transport, address) => {
       // The disconnected client could've been either a worker or an app; remove whichever it was
       addressToWorker.get(address).foreach(removeWorker)
       addressToApp.get(address).foreach(finishApplication)
+      if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
     }
 
     case RequestMasterState => {
@@ -190,6 +271,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     }
   }
 
+  def canCompleteRecovery =
+    workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
+      apps.count(_.state == ApplicationState.UNKNOWN) == 0
+
+  def completeRecovery() {
+    synchronized {
+      if (state != MasterState.RECOVERING) { return }
+      state = MasterState.COMPLETING_RECOVERY
+    }
+
+    // Kill off any workers and apps that didn't respond to us.
+    workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_))
+    apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication(_))
+
+    state = MasterState.ALIVE
+    schedule()
+    logInfo("Recovery complete - resuming operations!")
+  }
+
   /**
    * Can an app use the given worker? True if the worker has enough memory and we haven't already
    * launched an executor for the app on it (right now the standalone backend doesn't like having
@@ -204,6 +304,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
    * every time a new app joins or resource availability changes.
    */
   def schedule() {
+    if (state != MasterState.ALIVE) { return }
     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
     // in the queue, then the second app, etc.
     if (spreadOutApps) {
@@ -257,8 +358,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
   }
 
-  def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
-    publicAddress: String): WorkerInfo = {
+  def registerWorker(worker: WorkerInfo): Unit = {
     // There may be one or more refs to dead workers on this same node (w/ different ID's),
     // remove them.
     workers.filter { w =>
@@ -266,12 +366,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     }.foreach { w =>
       workers -= w
     }
-    val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+
+    val workerAddress = worker.actor.path.address
+    if (addressToWorker.contains(workerAddress)) {
+      logInfo("Attempted to re-register worker at same address: " + workerAddress)
+      return
+    }
+
     workers += worker
     idToWorker(worker.id) = worker
     actorToWorker(sender) = worker
-    addressToWorker(sender.path.address) = worker
-    worker
+    addressToWorker(workerAddress) = worker
   }
 
   def removeWorker(worker: WorkerInfo) {
@@ -286,25 +391,35 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
         exec.id, ExecutorState.LOST, Some("worker lost"), None)
       exec.application.removeExecutor(exec)
     }
+    persistenceEngine.removeWorker(worker)
   }
 
-  def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
+  def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
     val now = System.currentTimeMillis()
     val date = new Date(now)
-    val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+    new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+  }
+
+  def registerApplication(app: ApplicationInfo): Unit = {
+    val appAddress = app.driver.path.address
+    if (addressToWorker.contains(appAddress)) {
+      logInfo("Attempted to re-register application at same address: " + appAddress)
+      return
+    }
+
     applicationMetricsSystem.registerSource(app.appSource)
     apps += app
     idToApp(app.id) = app
-    actorToApp(driver) = app
-    addressToApp(driver.path.address) = app
+    actorToApp(app.driver) = app
+    addressToApp(appAddress) = app
     if (firstApp == None) {
       firstApp = Some(app)
     }
     val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
-    if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
+    if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) {
       logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
     }
-    app
+    waitingApps += app
   }
 
   def finishApplication(app: ApplicationInfo) {
@@ -336,6 +451,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       if (state != ApplicationState.FINISHED) {
         app.driver ! ApplicationRemoved(state.toString)
       }
+      persistenceEngine.removeApplication(app)
       schedule()
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala
new file mode 100644
index 0000000000000000000000000000000000000000..9ea5e9752e0b763c3752691220cd6d5967228d50
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+private[spark] object MasterState
+  extends Enumeration("ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
+
+  type MasterState = Value
+
+  val ALIVE, RECOVERING, COMPLETING_RECOVERY = Value
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
new file mode 100644
index 0000000000000000000000000000000000000000..07d23c6bf3438c558d8e168eed4af5433bf908a1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+/**
+ * Allows Master to persist any state that is necessary in order to recover from a failure.
+ * The following semantics are required:
+ *   - addApplication and addWorker are called before completing registration of a new app/worker.
+ *   - removeApplication and removeWorker are called at any time.
+ * Given these two requirements, we will have all apps and workers persisted, but
+ * we might not have yet deleted apps or workers that finished.
+ */
+trait PersistenceEngine {
+  def addApplication(app: ApplicationInfo)
+
+  def removeApplication(app: ApplicationInfo)
+
+  def addWorker(worker: WorkerInfo)
+
+  def removeWorker(worker: WorkerInfo)
+
+  /**
+   * Returns the persisted data sorted by their respective ids (which implies that they're
+   * sorted by time order of creation).
+   */
+  def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo])
+}
+
+class BlackHolePersistenceEngine extends PersistenceEngine {
+  override def addApplication(app: ApplicationInfo) {}
+  override def removeApplication(app: ApplicationInfo) {}
+  override def addWorker(worker: WorkerInfo) {}
+  override def removeWorker(worker: WorkerInfo) {}
+  override def readPersistedData() = (Nil, Nil)
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 6219f11f2a2b2c3b7b02bc7a56b7643ca8b1020f..2ab7bb233c1ccc1dc74241facfe8ecfea12ea2df 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -29,21 +29,37 @@ private[spark] class WorkerInfo(
   val memory: Int,
   val actor: ActorRef,
   val webUiPort: Int,
-  val publicAddress: String) {
+  val publicAddress: String)
+  extends Serializable {
 
   Utils.checkHost(host, "Expected hostname")
   assert (port > 0)
 
-  var executors = new mutable.HashMap[String, ExecutorInfo]  // fullId => info
-  var state: WorkerState.Value = WorkerState.ALIVE
-  var coresUsed = 0
-  var memoryUsed = 0
+  @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info
+  @transient var state: WorkerState.Value = _
+  @transient var coresUsed: Int = _
+  @transient var memoryUsed: Int = _
 
-  var lastHeartbeat = System.currentTimeMillis()
+  @transient var lastHeartbeat: Long = _
+
+  init()
 
   def coresFree: Int = cores - coresUsed
   def memoryFree: Int = memory - memoryUsed
 
+  private def readObject(in: java.io.ObjectInputStream) : Unit = {
+    in.defaultReadObject()
+    init()
+  }
+
+  private def init() {
+    executors = new mutable.HashMap[String, ExecutorInfo]
+    state = WorkerState.ALIVE
+    coresUsed = 0
+    memoryUsed = 0
+    lastHeartbeat = System.currentTimeMillis()
+  }
+
   def hostPort: String = {
     assert (port > 0)
     host + ":" + port
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
index b5ee6dca79fab36aeace009d436d7fd1ea69e481..c8d34f25e25e053cb950323d78003680eb46f71c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.deploy.master
 
-private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
+private[spark] object WorkerState
+  extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED", "UNKNOWN") {
+
   type WorkerState = Value
 
-  val ALIVE, DEAD, DECOMMISSIONED = Value
+  val ALIVE, DEAD, DECOMMISSIONED, UNKNOWN = Value
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index e3dc30eefc56f951e69aa186982c0ddfe03573d2..8fabc956659015c4df1796a918779a954b81705d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -43,7 +43,8 @@ private[spark] class ExecutorRunner(
     val workerId: String,
     val host: String,
     val sparkHome: File,
-    val workDir: File)
+    val workDir: File,
+    var state: ExecutorState.Value)
   extends Logging {
 
   val fullId = appId + "/" + execId
@@ -83,7 +84,8 @@ private[spark] class ExecutorRunner(
         process.destroy()
         process.waitFor()
       }
-      worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
+      state = ExecutorState.KILLED
+      worker ! ExecutorStateChanged(appId, execId, state, None, None)
       Runtime.getRuntime.removeShutdownHook(shutdownHook)
     }
   }
@@ -180,9 +182,9 @@ private[spark] class ExecutorRunner(
       // long-lived processes only. However, in the future, we might restart the executor a few
       // times on the same machine.
       val exitCode = process.waitFor()
+      state = ExecutorState.FAILED
       val message = "Command exited with code " + exitCode
-      worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
-                                    Some(exitCode))
+      worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
     } catch {
       case interrupted: InterruptedException =>
         logInfo("Runner thread for executor " + fullId + " interrupted")
@@ -192,8 +194,9 @@ private[spark] class ExecutorRunner(
         if (process != null) {
           process.destroy()
         }
+        state = ExecutorState.FAILED
         val message = e.getClass + ": " + e.getMessage
-        worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
+        worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
       }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 09530beb3bec05ca7e37b1f006ae7d14d1748257..46455aa5aeb6f7d49c6053cb68af5a789dd0f47e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -27,8 +27,8 @@ import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
 import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
 import akka.util.duration._
 
-import org.apache.spark.{Logging}
-import org.apache.spark.deploy.ExecutorState
+import org.apache.spark.{SparkEnv, Logging}
+import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
@@ -42,7 +42,7 @@ private[spark] class Worker(
     webUiPort: Int,
     cores: Int,
     memory: Int,
-    masterUrl: String,
+    var masterUrl: String,
     workDirPath: String = null)
   extends Actor with Logging {
 
@@ -125,19 +125,30 @@ private[spark] class Worker(
         master ! Heartbeat(workerId)
       }
 
+    case MasterChanged(url, uiUrl) =>
+      logInfo("Master has changed, new master is at " + url)
+      masterUrl = url
+      masterWebUiUrl = uiUrl
+      context.unwatch(master)
+      master = context.actorFor(Master.toAkkaUrl(masterUrl))
+      context.watch(master) // Doesn't work with remote actors, but useful for testing
+      val execs = executors.values.
+        map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
+      sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
+
     case RegisterWorkerFailed(message) =>
       logError("Worker registration failed: " + message)
       System.exit(1)
 
     case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
       logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
-      val manager = new ExecutorRunner(
-        appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
+      val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+        self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
       executors(appId + "/" + execId) = manager
       manager.start()
       coresUsed += cores_
       memoryUsed += memory_
-      master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
+      master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
 
     case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
       master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
@@ -174,11 +185,7 @@ private[spark] class Worker(
   }
 
   def masterDisconnected() {
-    // TODO: It would be nice to try to reconnect to the master, but just shut down for now.
-    // (Note that if reconnecting we would also need to assign IDs differently.)
-    logError("Connection to master failed! Shutting down.")
-    executors.values.foreach(_.kill())
-    System.exit(1)
+    logError("Connection to master failed! Waiting for master to reconnect...")
   }
 
   def generateWorkerId(): String = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c49768c0caaa1211fb08457c356951bb68e6252..c173cdf4492ac5feaaa2d7015060d15b9897e7b0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -71,8 +71,7 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def disconnected() {
     if (!stopping) {
-      logError("Disconnected from Spark cluster!")
-      scheduler.error("Disconnected from Spark cluster")
+      logError("Disconnected from Spark cluster! Waiting for reconnection...")
     }
   }