diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index cf11393a0398e6671582d6ef56cd7a03a2fe230f..e8a1e5889ff3242826111646a791442701b4a953 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -41,7 +41,6 @@ private[spark] class BlockManagerMaster( } } - /** Remove a dead host from the master actor. This is only called on the master side. */ def notifyADeadHost(host: String) { tell(RemoveHost(host)) diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 0d84e559cb470a1ad9e48f7f9959fe499641ade7..e3de8d8e4e593f432df47ab727e742c4771a0020 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -16,91 +16,6 @@ import spark.{Logging, Utils} * BlockManagerMasterActor is an actor on the master node to track statuses of * all slaves' block managers. */ - -private[spark] -object BlockManagerMasterActor { - - case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) - - class BlockManagerInfo( - val blockManagerId: BlockManagerId, - timeMs: Long, - val maxMem: Long, - val slaveActor: ActorRef) - extends Logging { - - private var _lastSeenMs: Long = timeMs - private var _remainingMem: Long = maxMem - - // Mapping from block id to its status. - private val _blocks = new JHashMap[String, BlockStatus] - - logInfo("Registering block manager %s:%d with %s RAM".format( - blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem))) - - def updateLastSeenMs() { - _lastSeenMs = System.currentTimeMillis() - } - - def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) - : Unit = synchronized { - - updateLastSeenMs() - - if (_blocks.containsKey(blockId)) { - // The block exists on the slave already. - val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel - - if (originalLevel.useMemory) { - _remainingMem += memSize - } - } - - if (storageLevel.isValid) { - // isValid means it is either stored in-memory or on-disk. - _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize)) - if (storageLevel.useMemory) { - _remainingMem -= memSize - logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format( - blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), - Utils.memoryBytesToString(_remainingMem))) - } - if (storageLevel.useDisk) { - logInfo("Added %s on disk on %s:%d (size: %s)".format( - blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) - } - } else if (_blocks.containsKey(blockId)) { - // If isValid is not true, drop the block. - val blockStatus: BlockStatus = _blocks.get(blockId) - _blocks.remove(blockId) - if (blockStatus.storageLevel.useMemory) { - _remainingMem += blockStatus.memSize - logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format( - blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), - Utils.memoryBytesToString(_remainingMem))) - } - if (blockStatus.storageLevel.useDisk) { - logInfo("Removed %s on %s:%d on disk (size: %s)".format( - blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) - } - } - } - - def remainingMem: Long = _remainingMem - - def lastSeenMs: Long = _lastSeenMs - - def blocks: JHashMap[String, BlockStatus] = _blocks - - override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem - - def clear() { - _blocks.clear() - } - } -} - - private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { @@ -108,8 +23,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo] - // Mapping from host name to block manager id. - private val blockManagerIdByHost = new HashMap[String, BlockManagerId] + // Mapping from host name to block manager id. We allow multiple block managers + // on the same host name (ip). + private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]] // Mapping from block id to the set of block managers that have the block. private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] @@ -132,9 +48,62 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { super.preStart() } + def receive = { + case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => + register(blockManagerId, maxMemSize, slaveActor) + + case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => + blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) + + case GetLocations(blockId) => + getLocations(blockId) + + case GetLocationsMultipleBlockIds(blockIds) => + getLocationsMultipleBlockIds(blockIds) + + case GetPeers(blockManagerId, size) => + getPeersDeterministic(blockManagerId, size) + /*getPeers(blockManagerId, size)*/ + + case GetMemoryStatus => + getMemoryStatus + + case RemoveBlock(blockId) => + removeBlock(blockId) + + case RemoveHost(host) => + removeHost(host) + sender ! true + + case StopBlockManagerMaster => + logInfo("Stopping BlockManagerMaster") + sender ! true + if (timeoutCheckingTask != null) { + timeoutCheckingTask.cancel + } + context.stop(self) + + case ExpireDeadHosts => + expireDeadHosts() + + case HeartBeat(blockManagerId) => + heartBeat(blockManagerId) + + case other => + logInfo("Got unknown message: " + other) + } + def removeBlockManager(blockManagerId: BlockManagerId) { val info = blockManagerInfo(blockManagerId) - blockManagerIdByHost.remove(blockManagerId.ip) + + // Remove the block manager from blockManagerIdByHost. If the list of block + // managers belonging to the IP is empty, remove the entry from the hash map. + blockManagerIdByHost.get(blockManagerId.ip).foreach { managers: ArrayBuffer[BlockManagerId] => + managers -= blockManagerId + if (managers.size == 0) blockManagerIdByHost.remove(blockManagerId.ip) + } + + // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) var iterator = info.blocks.keySet.iterator while (iterator.hasNext) { @@ -158,14 +127,13 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { toRemove += info.blockManagerId } } - // TODO: Remove corresponding block infos toRemove.foreach(removeBlockManager) } def removeHost(host: String) { logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.") logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq) - blockManagerIdByHost.get(host).foreach(removeBlockManager) + blockManagerIdByHost.get(host).foreach(_.foreach(removeBlockManager)) logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq) sender ! true } @@ -183,51 +151,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { } } - def receive = { - case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => - register(blockManagerId, maxMemSize, slaveActor) - - case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => - blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) - - case GetLocations(blockId) => - getLocations(blockId) - - case GetLocationsMultipleBlockIds(blockIds) => - getLocationsMultipleBlockIds(blockIds) - - case GetPeers(blockManagerId, size) => - getPeersDeterministic(blockManagerId, size) - /*getPeers(blockManagerId, size)*/ - - case GetMemoryStatus => - getMemoryStatus - - case RemoveBlock(blockId) => - removeBlock(blockId) - - case RemoveHost(host) => - removeHost(host) - sender ! true - - case StopBlockManagerMaster => - logInfo("Stopping BlockManagerMaster") - sender ! true - if (timeoutCheckingTask != null) { - timeoutCheckingTask.cancel - } - context.stop(self) - - case ExpireDeadHosts => - expireDeadHosts() - - case HeartBeat(blockManagerId) => - heartBeat(blockManagerId) - - case other => - logInfo("Got unknown message: " + other) - } - // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. private def removeBlock(blockId: String) { @@ -261,20 +184,22 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) - if (blockManagerIdByHost.contains(blockManagerId.ip) && - blockManagerIdByHost(blockManagerId.ip) != blockManagerId) { - val oldId = blockManagerIdByHost(blockManagerId.ip) - logInfo("Got second registration for host " + blockManagerId + - "; removing old slave " + oldId) - removeBlockManager(oldId) - } + if (blockManagerId.ip == Utils.localHostName() && !isLocal) { logInfo("Got Register Msg from master node, don't register it") } else { + blockManagerIdByHost.get(blockManagerId.ip) match { + case Some(managers) => + // A block manager of the same host name already exists. + logInfo("Got another registration for host " + blockManagerId) + managers += blockManagerId + case None => + blockManagerIdByHost += (blockManagerId.ip -> ArrayBuffer(blockManagerId)) + } + blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo( blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor)) } - blockManagerIdByHost += (blockManagerId.ip -> blockManagerId) logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) sender ! true } @@ -387,12 +312,12 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - val peersWithIndices = peers.zipWithIndex - val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1) + val selfIndex = peers.indexOf(blockManagerId) if (selfIndex == -1) { throw new Exception("Self index for " + blockManagerId + " not found") } + // Note that this logic will select the same node multiple times if there aren't enough peers var index = selfIndex while (res.size < size) { index += 1 @@ -404,3 +329,87 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { sender ! res.toSeq } } + + +private[spark] +object BlockManagerMasterActor { + + case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) + + class BlockManagerInfo( + val blockManagerId: BlockManagerId, + timeMs: Long, + val maxMem: Long, + val slaveActor: ActorRef) + extends Logging { + + private var _lastSeenMs: Long = timeMs + private var _remainingMem: Long = maxMem + + // Mapping from block id to its status. + private val _blocks = new JHashMap[String, BlockStatus] + + logInfo("Registering block manager %s:%d with %s RAM".format( + blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem))) + + def updateLastSeenMs() { + _lastSeenMs = System.currentTimeMillis() + } + + def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) + : Unit = synchronized { + + updateLastSeenMs() + + if (_blocks.containsKey(blockId)) { + // The block exists on the slave already. + val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel + + if (originalLevel.useMemory) { + _remainingMem += memSize + } + } + + if (storageLevel.isValid) { + // isValid means it is either stored in-memory or on-disk. + _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize)) + if (storageLevel.useMemory) { + _remainingMem -= memSize + logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), + Utils.memoryBytesToString(_remainingMem))) + } + if (storageLevel.useDisk) { + logInfo("Added %s on disk on %s:%d (size: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) + } + } else if (_blocks.containsKey(blockId)) { + // If isValid is not true, drop the block. + val blockStatus: BlockStatus = _blocks.get(blockId) + _blocks.remove(blockId) + if (blockStatus.storageLevel.useMemory) { + _remainingMem += blockStatus.memSize + logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), + Utils.memoryBytesToString(_remainingMem))) + } + if (blockStatus.storageLevel.useDisk) { + logInfo("Removed %s on %s:%d on disk (size: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) + } + } + } + + def remainingMem: Long = _remainingMem + + def lastSeenMs: Long = _lastSeenMs + + def blocks: JHashMap[String, BlockStatus] = _blocks + + override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem + + def clear() { + _blocks.clear() + } + } +} diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index e50ce1430fdc118919405cc849992e85a3bb8ee7..4e28a7e2bc8ed0e05b9197b4a9aa092857ff7f1a 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -122,16 +122,16 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("master + 2 managers interaction") { store = new BlockManager(actorSystem, master, serializer, 2000) - val otherStore = new BlockManager(actorSystem, master, new KryoSerializer, 2000) + store2 = new BlockManager(actorSystem, master, new KryoSerializer, 2000) val peers = master.getPeers(store.blockManagerId, 1) assert(peers.size === 1, "master did not return the other manager as a peer") - assert(peers.head === otherStore.blockManagerId, "peer returned by master is not the other manager") + assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager") val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2) - otherStore.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2) + store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2) assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1") assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2") } @@ -189,8 +189,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.getLocations("a1").size == 0, "a1 was not removed from master") store invokePrivate heartBeat() - assert(master.getLocations("a1").size > 0, - "a1 was not reregistered with master") + assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") } test("reregistration on block update") { @@ -211,28 +210,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.getLocations("a2").size > 0, "master was not told about a2") } - test("deregistration on duplicate") { - val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager(actorSystem, master, serializer, 2000) - val a1 = new Array[Byte](400) - - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) - - assert(master.getLocations("a1").size > 0, "master was not told about a1") - - store2 = new BlockManager(actorSystem, master, serializer, 2000) - - assert(master.getLocations("a1").size == 0, "a1 was not removed from master") - - store invokePrivate heartBeat() - - assert(master.getLocations("a1").size > 0, "master was not told about a1") - - store2 invokePrivate heartBeat() - - assert(master.getLocations("a1").size == 0, "a2 was not removed from master") - } - test("in-memory LRU storage") { store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400)