diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 2f9473aedc2dcef118d37269f67bac476522e5cc..83a9cbd63d391f69a14bcfd0a58e2d2be667f02a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -402,6 +402,17 @@ private[spark] class BlockManager(
     locations
   }
 
+  /**
+   * Cleanup code run in response to a failed local read.
+   * Must be called while holding a read lock on the block.
+   */
+  private def handleLocalReadFailure(blockId: BlockId): Nothing = {
+    releaseLock(blockId)
+    // Remove the missing block so that its unavailability is reported to the driver
+    removeBlock(blockId)
+    throw new SparkException(s"Block $blockId was not found even though it's read-locked")
+  }
+
   /**
    * Get block from local block manager as an iterator of Java objects.
    */
@@ -441,8 +452,7 @@ private[spark] class BlockManager(
           val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
           Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
-          releaseLock(blockId)
-          throw new SparkException(s"Block $blockId was not found even though it's read-locked")
+          handleLocalReadFailure(blockId)
         }
     }
   }
@@ -489,8 +499,7 @@ private[spark] class BlockManager(
         // The block was not found on disk, so serialize an in-memory copy:
         serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get)
       } else {
-        releaseLock(blockId)
-        throw new SparkException(s"Block $blockId was not found even though it's read-locked")
+        handleLocalReadFailure(blockId)
       }
     } else {  // storage level is serialized
       if (level.useMemory && memoryStore.contains(blockId)) {
@@ -499,8 +508,7 @@ private[spark] class BlockManager(
         val diskBytes = diskStore.getBytes(blockId)
         maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
       } else {
-        releaseLock(blockId)
-        throw new SparkException(s"Block $blockId was not found even though it's read-locked")
+        handleLocalReadFailure(blockId)
       }
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index 333c23bdaf6d6626aea65ca5484f8e2a9e7a961c..132f6361e41e6bdc748c553d9c394393b97276fd 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import java.io.{IOException, NotSerializableException, ObjectInputStream}
 
 import org.apache.spark.memory.TestMemoryConsumer
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.NonSerializable
 
 // Common state shared by FailureSuite-launched tasks. We use a global object
@@ -241,6 +242,17 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
     FailureSuiteState.clear()
   }
 
+  test("failure because cached RDD partitions are missing from DiskStore (SPARK-15736)") {
+    sc = new SparkContext("local[1,2]", "test")
+    val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY)
+    rdd.count()
+    // Directly delete all files from the disk store, triggering failures when reading cached data:
+    SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete())
+    // Each task should fail once due to missing cached data, but then should succeed on its second
+    // attempt because the missing cache locations will be purged and the blocks will be recomputed.
+    rdd.count()
+  }
+
   // TODO: Need to add tests with shuffle fetch failures.
 }
 
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a2580304c4ed29fea1f277ffadba46ac8ac56fcf..6821582254f5b0999a8e43652f8da39a73763900 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1139,6 +1139,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.getSingle("a3").isDefined, "a3 was not in store")
   }
 
+  private def testReadWithLossOfOnDiskFiles(
+      storageLevel: StorageLevel,
+      readMethod: BlockManager => Option[_]): Unit = {
+    store = makeBlockManager(12000)
+    assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel))
+    assert(store.getStatus("blockId").isDefined)
+    // Directly delete all files from the disk store, triggering failures when reading blocks:
+    store.diskBlockManager.getAllFiles().foreach(_.delete())
+    // The BlockManager still thinks that these blocks exist:
+    assert(store.getStatus("blockId").isDefined)
+    // Because the BlockManager's metadata claims that the block exists (i.e. that it's present
+    // in at least one store), the read attempts to read it and fails when the on-disk file is
+    // missing.
+    intercept[SparkException] {
+      readMethod(store)
+    }
+    // Subsequent read attempts will succeed; the block isn't present but we return an expected
+    // "block not found" response rather than a fatal error:
+    assert(readMethod(store).isEmpty)
+    // The reason why this second read succeeded is because the metadata entry for the missing
+    // block was removed as a result of the read failure:
+    assert(store.getStatus("blockId").isEmpty)
+  }
+
+  test("remove block if a read fails due to missing DiskStore files (SPARK-15736)") {
+    val storageLevels = Seq(
+      StorageLevel(useDisk = true, useMemory = false, deserialized = false),
+      StorageLevel(useDisk = true, useMemory = false, deserialized = true))
+    val readMethods = Map[String, BlockManager => Option[_]](
+      "getLocalBytes" -> ((m: BlockManager) => m.getLocalBytes("blockId")),
+      "getLocalValues" -> ((m: BlockManager) => m.getLocalValues("blockId"))
+    )
+    testReadWithLossOfOnDiskFiles(StorageLevel.DISK_ONLY, _.getLocalBytes("blockId"))
+    for ((readMethodName, readMethod) <- readMethods; storageLevel <- storageLevels) {
+      withClue(s"$readMethodName $storageLevel") {
+        testReadWithLossOfOnDiskFiles(storageLevel, readMethod)
+      }
+    }
+  }
+
   test("SPARK-13328: refresh block locations (fetch should fail after hitting a threshold)") {
     val mockBlockTransferService =
       new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5))