Skip to content
Snippets Groups Projects
Commit 21b271f5 authored by Reynold Xin's avatar Reynold Xin
Browse files

Suppress shuffle block updates when a slave node comes back.

parent c10b2299
No related branches found
No related tags found
No related merge requests found
......@@ -47,7 +47,7 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
}
private[spark]
private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
......@@ -200,31 +200,36 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
}
/**
* Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo
* block was successfully recorded and false if the slave needs to reregister.
* Actually send a BlockUpdate message. Returns the mater's response, which will be true if the
* block was successfully recorded and false if the slave needs to re-register.
*/
private def tryToReportBlockStatus(blockId: String): Boolean = {
val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match {
case null =>
(StorageLevel.NONE, 0L, 0L)
(StorageLevel.NONE, 0L, 0L, false)
case info =>
info.synchronized {
info.level match {
case null =>
(StorageLevel.NONE, 0L, 0L)
(StorageLevel.NONE, 0L, 0L, false)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
(
new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
if (inMem) memoryStore.getSize(blockId) else 0L,
if (onDisk) diskStore.getSize(blockId) else 0L
if (onDisk) diskStore.getSize(blockId) else 0L,
info.tellMaster
)
}
}
}
return master.mustBlockUpdate(
BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
if (tellMaster) {
master.mustBlockUpdate(BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
} else {
true
}
}
......
......@@ -215,7 +215,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val toRemove = new HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
logInfo("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
toRemove += info.blockManagerId
}
}
......@@ -279,7 +279,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case ExpireDeadHosts =>
expireDeadHosts()
case HeartBeat(blockManagerId) =>
case HeartBeat(blockManagerId) =>
heartBeat(blockManagerId)
case other =>
......@@ -538,7 +538,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
val answer = askMaster(msg).asInstanceOf[Boolean]
return Some(answer)
} catch {
case e: Exception =>
case e: Exception =>
logError("Failed in syncHeartBeat", e)
return None
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment