diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6946a98cdda68464f7a1c70441c438f2a4333da0..45b73380806ddba798e75241b56021059a8ddf7b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1159,6 +1159,34 @@ private[spark] class BlockManager(
     }
   }
 
+  /**
+   * Called for pro-active replenishment of blocks lost due to executor failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   */
+  def replicateBlock(
+      blockId: BlockId,
+      existingReplicas: Set[BlockManagerId],
+      maxReplicas: Int): Unit = {
+    logInfo(s"Pro-actively replicating $blockId")
+    blockInfoManager.lockForReading(blockId).foreach { info =>
+      val data = doGetLocalBytes(blockId, info)
+      val storageLevel = StorageLevel(
+        useDisk = info.level.useDisk,
+        useMemory = info.level.useMemory,
+        useOffHeap = info.level.useOffHeap,
+        deserialized = info.level.deserialized,
+        replication = maxReplicas)
+      try {
+        replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
+      } finally {
+        releaseLock(blockId)
+      }
+    }
+  }
+
   /**
    * Replicate block to another node. Note that this is a blocking call that returns after
    * the block has been replicated.
@@ -1167,7 +1195,8 @@ private[spark] class BlockManager(
       blockId: BlockId,
       data: ChunkedByteBuffer,
       level: StorageLevel,
-      classTag: ClassTag[_]): Unit = {
+      classTag: ClassTag[_],
+      existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
 
     val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
     val tLevel = StorageLevel(
@@ -1181,20 +1210,22 @@ private[spark] class BlockManager(
 
     val startTime = System.nanoTime
 
-    var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
+    var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
     var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
     var numFailures = 0
 
+    val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_))
+
     var peersForReplication = blockReplicationPolicy.prioritize(
       blockManagerId,
-      getPeers(false),
-      mutable.HashSet.empty,
+      initialPeers,
+      peersReplicatedTo,
       blockId,
       numPeersToReplicateTo)
 
     while(numFailures <= maxReplicationFailures &&
-        !peersForReplication.isEmpty &&
-        peersReplicatedTo.size != numPeersToReplicateTo) {
+      !peersForReplication.isEmpty &&
+      peersReplicatedTo.size < numPeersToReplicateTo) {
       val peer = peersForReplication.head
       try {
         val onePeerStartTime = System.nanoTime
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 145c434a4f0cf4a7885feee6ee2e79485fdd9d57..84c04d22600adaf8a2a25d0078d348615f05e1e1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -22,6 +22,7 @@ import java.util.{HashMap => JHashMap}
 import scala.collection.mutable
 import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Random
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
@@ -65,6 +66,8 @@ class BlockManagerMasterEndpoint(
     mapper
   }
 
+  val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
+
   logInfo("BlockManagerMasterEndpoint up")
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -195,17 +198,38 @@ class BlockManagerMasterEndpoint(
 
     // Remove it from blockManagerInfo and remove all the blocks.
     blockManagerInfo.remove(blockManagerId)
+
     val iterator = info.blocks.keySet.iterator
     while (iterator.hasNext) {
       val blockId = iterator.next
       val locations = blockLocations.get(blockId)
       locations -= blockManagerId
+      // De-register the block if none of the block managers have it. Otherwise, if pro-active
+      // replication is enabled, and a block is either an RDD or a test block (the latter is used
+      // for unit testing), we send a message to a randomly chosen executor location to replicate
+      // the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
+      // etc.) as replication doesn't make much sense in that context.
       if (locations.size == 0) {
         blockLocations.remove(blockId)
+        logWarning(s"No more replicas available for $blockId !")
+      } else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
+        // As a heursitic, assume single executor failure to find out the number of replicas that
+        // existed before failure
+        val maxReplicas = locations.size + 1
+        val i = (new Random(blockId.hashCode)).nextInt(locations.size)
+        val blockLocations = locations.toSeq
+        val candidateBMId = blockLocations(i)
+        blockManagerInfo.get(candidateBMId).foreach { bm =>
+          val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
+          val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
+          bm.slaveEndpoint.ask[Boolean](replicateMsg)
+        }
       }
     }
+
     listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
     logInfo(s"Removing block manager $blockManagerId")
+
   }
 
   private def removeExecutor(execId: String) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index d71acbb4cf77166e8fd1da5a3d486a4734541ee8..0aea438e7f473e0baea6dd3038d332d8e26cbe3e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -32,6 +32,10 @@ private[spark] object BlockManagerMessages {
   // blocks that the master knows about.
   case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave
 
+  // Replicate blocks that were lost due to executor failure
+  case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int)
+    extends ToBlockManagerSlave
+
   // Remove all blocks belonging to a specific RDD.
   case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index d17ddbc1625794ec4047a6606f61af5c557df44f..1aaa42459df69326b7f21fd920c5ecb2f0a50f8d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -74,6 +74,10 @@ class BlockManagerSlaveEndpoint(
 
     case TriggerThreadDump =>
       context.reply(Utils.getThreadDump())
+
+    case ReplicateBlock(blockId, replicas, maxReplicas) =>
+      context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))
+
   }
 
   private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index f4bfdc2fd69a93cd85088bd61fb483889774796a..ccede34b8cb4d6aa138fcb29cf5b3e9b4566c135 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -37,32 +37,31 @@ import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage.StorageLevel._
 
-/** Testsuite that tests block replication in BlockManager */
-class BlockManagerReplicationSuite extends SparkFunSuite
-    with Matchers
-    with BeforeAndAfter
-    with LocalSparkContext {
-
-  private val conf = new SparkConf(false).set("spark.app.id", "test")
-  private var rpcEnv: RpcEnv = null
-  private var master: BlockManagerMaster = null
-  private val securityMgr = new SecurityManager(conf)
-  private val bcastManager = new BroadcastManager(true, conf, securityMgr)
-  private val mapOutputTracker = new MapOutputTrackerMaster(conf, bcastManager, true)
-  private val shuffleManager = new SortShuffleManager(conf)
+trait BlockManagerReplicationBehavior extends SparkFunSuite
+  with Matchers
+  with BeforeAndAfter
+  with LocalSparkContext {
+
+  val conf: SparkConf
+  protected var rpcEnv: RpcEnv = null
+  protected var master: BlockManagerMaster = null
+  protected lazy val securityMgr = new SecurityManager(conf)
+  protected lazy val bcastManager = new BroadcastManager(true, conf, securityMgr)
+  protected lazy val mapOutputTracker = new MapOutputTrackerMaster(conf, bcastManager, true)
+  protected lazy val shuffleManager = new SortShuffleManager(conf)
 
   // List of block manager created during an unit test, so that all of the them can be stopped
   // after the unit test.
-  private val allStores = new ArrayBuffer[BlockManager]
+  protected val allStores = new ArrayBuffer[BlockManager]
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
-  conf.set("spark.kryoserializer.buffer", "1m")
-  private val serializer = new KryoSerializer(conf)
+
+  protected lazy val serializer = new KryoSerializer(conf)
 
   // Implicitly convert strings to BlockIds for test clarity.
-  private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+  protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
 
-  private def makeBlockManager(
+  protected def makeBlockManager(
       maxMem: Long,
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     conf.set("spark.testing.memory", maxMem.toString)
@@ -355,7 +354,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite
    * is correct. Then it also drops the block from memory of each store (using LRU) and
    * again checks whether the master's knowledge gets updated.
    */
-  private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
+  protected def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
     import org.apache.spark.storage.StorageLevel._
 
     assert(maxReplication > 1,
@@ -448,3 +447,61 @@ class BlockManagerReplicationSuite extends SparkFunSuite
     }
   }
 }
+
+class BlockManagerReplicationSuite extends BlockManagerReplicationBehavior {
+  val conf = new SparkConf(false).set("spark.app.id", "test")
+  conf.set("spark.kryoserializer.buffer", "1m")
+}
+
+class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehavior {
+  val conf = new SparkConf(false).set("spark.app.id", "test")
+  conf.set("spark.kryoserializer.buffer", "1m")
+  conf.set("spark.storage.replication.proactive", "true")
+  conf.set("spark.storage.exceptionOnPinLeak", "true")
+
+  (2 to 5).foreach{ i =>
+    test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") {
+      testProactiveReplication(i)
+    }
+  }
+
+  def testProactiveReplication(replicationFactor: Int) {
+    val blockSize = 1000
+    val storeSize = 10000
+    val initialStores = (1 to 10).map { i => makeBlockManager(storeSize, s"store$i") }
+
+    val blockId = "a1"
+
+    val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
+    initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+
+    val blockLocations = master.getLocations(blockId)
+    logInfo(s"Initial locations : $blockLocations")
+
+    assert(blockLocations.size === replicationFactor)
+
+    // remove a random blockManager
+    val executorsToRemove = blockLocations.take(replicationFactor - 1)
+    logInfo(s"Removing $executorsToRemove")
+    executorsToRemove.foreach{exec =>
+      master.removeExecutor(exec.executorId)
+      // giving enough time for replication to happen and new block be reported to master
+      Thread.sleep(200)
+    }
+
+    // giving enough time for replication complete and locks released
+    Thread.sleep(500)
+
+    val newLocations = master.getLocations(blockId).toSet
+    logInfo(s"New locations : $newLocations")
+    assert(newLocations.size === replicationFactor)
+    // there should only be one common block manager between initial and new locations
+    assert(newLocations.intersect(blockLocations.toSet).size === 1)
+
+    // check if all the read locks have been released
+    initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm =>
+      val locks = bm.releaseAllLocksForTask(BlockInfo.NON_TASK_WRITER)
+      assert(locks.size === 0, "Read locks unreleased!")
+    }
+  }
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 2fcb3a096aea5cbefbdec4e1b8cc6e0b3df4d08c..63392a741a1f038d1ed4364261aa524fe84ca90d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1000,6 +1000,15 @@ Apart from these, the following properties are also available, and may be useful
     storage space to unroll the new block in its entirety.
   </td>
 </tr>
+<tr>
+  <td><code>spark.storage.replication.proactive<code></td>
+  <td>false</td>
+  <td>
+    Enables proactive block replication for RDD blocks. Cached RDD block replicas lost due to
+    executor failures are replenished if there are any existing available replicas. This tries
+    to get the replication level of the block to the initial number.
+  </td>
+</tr>
 </table>
 
 ### Execution Behavior