Skip to content
Snippets Groups Projects
Commit 06f855c2 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge branch 'spark-633' of github.com:rxin/spark into spark-633

parents 8c01295b 7c9e3d1c
No related branches found
No related tags found
No related merge requests found
...@@ -829,7 +829,10 @@ class BlockManager( ...@@ -829,7 +829,10 @@ class BlockManager(
diskStore.putBytes(blockId, bytes, level) diskStore.putBytes(blockId, bytes, level)
} }
} }
memoryStore.remove(blockId) val blockWasRemoved = memoryStore.remove(blockId)
if (!blockWasRemoved) {
logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
}
if (info.tellMaster) { if (info.tellMaster) {
reportBlockStatus(blockId) reportBlockStatus(blockId)
} }
...@@ -853,8 +856,12 @@ class BlockManager( ...@@ -853,8 +856,12 @@ class BlockManager(
val info = blockInfo.get(blockId).orNull val info = blockInfo.get(blockId).orNull
if (info != null) info.synchronized { if (info != null) info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning. // Removals are idempotent in disk store and memory store. At worst, we get a warning.
memoryStore.remove(blockId) val removedFromMemory = memoryStore.remove(blockId)
diskStore.remove(blockId) val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
logWarning("Block " + blockId + " could not be removed as it was not found in either " +
"the disk or memory store")
}
blockInfo.remove(blockId) blockInfo.remove(blockId)
} else { } else {
// The block has already been removed; do nothing. // The block has already been removed; do nothing.
......
...@@ -31,7 +31,12 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { ...@@ -31,7 +31,12 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def getValues(blockId: String): Option[Iterator[Any]] def getValues(blockId: String): Option[Iterator[Any]]
def remove(blockId: String) /**
* Remove a block, if it exists.
* @param blockId the block to remove.
* @return True if the block was found and removed, False otherwise.
*/
def remove(blockId: String): Boolean
def contains(blockId: String): Boolean def contains(blockId: String): Boolean
......
...@@ -92,10 +92,13 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) ...@@ -92,10 +92,13 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes)) getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes))
} }
override def remove(blockId: String) { override def remove(blockId: String): Boolean = {
val file = getFile(blockId) val file = getFile(blockId)
if (file.exists()) { if (file.exists()) {
file.delete() file.delete()
true
} else {
false
} }
} }
......
...@@ -90,7 +90,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -90,7 +90,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
} }
override def remove(blockId: String) { override def remove(blockId: String): Boolean = {
entries.synchronized { entries.synchronized {
val entry = entries.get(blockId) val entry = entries.get(blockId)
if (entry != null) { if (entry != null) {
...@@ -98,8 +98,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) ...@@ -98,8 +98,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
currentMemory -= entry.size currentMemory -= entry.size
logInfo("Block %s of size %d dropped from memory (free %d)".format( logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory)) blockId, entry.size, freeMemory))
true
} else { } else {
logWarning("Block " + blockId + " could not be removed as it does not exist") false
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment