diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 4e7d11996f8b2d8db3a00dbbc19e7fc8a2008cad..df295b18207b3253d303cee2e3d51730789ec953 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -47,7 +47,7 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter } } -private[spark] +private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) @@ -200,31 +200,36 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, } /** - * Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo - * block was successfully recorded and false if the slave needs to reregister. + * Actually send a BlockUpdate message. Returns the mater's response, which will be true if the + * block was successfully recorded and false if the slave needs to re-register. */ private def tryToReportBlockStatus(blockId: String): Boolean = { - val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match { + val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match { case null => - (StorageLevel.NONE, 0L, 0L) + (StorageLevel.NONE, 0L, 0L, false) case info => info.synchronized { info.level match { case null => - (StorageLevel.NONE, 0L, 0L) + (StorageLevel.NONE, 0L, 0L, false) case level => val inMem = level.useMemory && memoryStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) ( new StorageLevel(onDisk, inMem, level.deserialized, level.replication), if (inMem) memoryStore.getSize(blockId) else 0L, - if (onDisk) diskStore.getSize(blockId) else 0L + if (onDisk) diskStore.getSize(blockId) else 0L, + info.tellMaster ) } } } - return master.mustBlockUpdate( - BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)) + + if (tellMaster) { + master.mustBlockUpdate(BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)) + } else { + true + } } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index a7b60fc2cffc2a7f293ba4b072b73ce659ba2cda..0a4e68f43769c6ced41060c14df5a8cc07298ba5 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -215,7 +215,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor val toRemove = new HashSet[BlockManagerId] for (info <- blockManagerInfo.values) { if (info.lastSeenMs < minSeenTime) { - logInfo("Removing BlockManager " + info.blockManagerId + " with no recent heart beats") + logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats") toRemove += info.blockManagerId } } @@ -279,7 +279,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor case ExpireDeadHosts => expireDeadHosts() - case HeartBeat(blockManagerId) => + case HeartBeat(blockManagerId) => heartBeat(blockManagerId) case other => @@ -538,7 +538,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool val answer = askMaster(msg).asInstanceOf[Boolean] return Some(answer) } catch { - case e: Exception => + case e: Exception => logError("Failed in syncHeartBeat", e) return None }