diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 682ea7baffa715f640dd88d2fab065c819a40522..7a8ac10cdd88e51ebd06588ca6b7801cc9a693d1 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -59,7 +59,7 @@ class BlockManager( } } - private val blockInfo = new TimeStampedHashMap[String, BlockInfo]() + private val blockInfo = new TimeStampedHashMap[String, BlockInfo] private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val diskStore: BlockStore = @@ -139,8 +139,8 @@ class BlockManager( */ private def reportAllBlocks() { logInfo("Reporting " + blockInfo.size + " blocks to the master.") - for (blockId <- blockInfo.keys) { - if (!tryToReportBlockStatus(blockId)) { + for ((blockId, info) <- blockInfo) { + if (!tryToReportBlockStatus(blockId, info)) { logError("Failed to report " + blockId + " to master; giving up.") return } @@ -168,8 +168,8 @@ class BlockManager( * message reflecting the current status, *not* the desired storage level in its block info. * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. */ - def reportBlockStatus(blockId: String) { - val needReregister = !tryToReportBlockStatus(blockId) + def reportBlockStatus(blockId: String, info: BlockInfo) { + val needReregister = !tryToReportBlockStatus(blockId, info) if (needReregister) { logInfo("Got told to reregister updating block " + blockId) // Reregistering will report our new block for free. @@ -179,29 +179,23 @@ class BlockManager( } /** - * Actually send a BlockUpdate message. Returns the mater's response, which will be true if the - * block was successfully recorded and false if the slave needs to re-register. + * Actually send a UpdateBlockInfo message. Returns the mater's response, + * which will be true if the block was successfully recorded and false if + * the slave needs to re-register. */ - private def tryToReportBlockStatus(blockId: String): Boolean = { - val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match { - case None => - (StorageLevel.NONE, 0L, 0L, false) - case Some(info) => - info.synchronized { - info.level match { - case null => - (StorageLevel.NONE, 0L, 0L, false) - case level => - val inMem = level.useMemory && memoryStore.contains(blockId) - val onDisk = level.useDisk && diskStore.contains(blockId) - ( - new StorageLevel(onDisk, inMem, level.deserialized, level.replication), - if (inMem) memoryStore.getSize(blockId) else 0L, - if (onDisk) diskStore.getSize(blockId) else 0L, - info.tellMaster - ) - } - } + private def tryToReportBlockStatus(blockId: String, info: BlockInfo): Boolean = { + val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized { + info.level match { + case null => + (StorageLevel.NONE, 0L, 0L, false) + case level => + val inMem = level.useMemory && memoryStore.contains(blockId) + val onDisk = level.useDisk && diskStore.contains(blockId) + val storageLevel = new StorageLevel(onDisk, inMem, level.deserialized, level.replication) + val memSize = if (inMem) memoryStore.getSize(blockId) else 0L + val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L + (storageLevel, memSize, diskSize, info.tellMaster) + } } if (tellMaster) { @@ -648,7 +642,7 @@ class BlockManager( // and tell the master about it. myInfo.markReady(size) if (tellMaster) { - reportBlockStatus(blockId) + reportBlockStatus(blockId, myInfo) } } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) @@ -735,7 +729,7 @@ class BlockManager( // and tell the master about it. myInfo.markReady(bytes.limit) if (tellMaster) { - reportBlockStatus(blockId) + reportBlockStatus(blockId, myInfo) } } @@ -834,7 +828,7 @@ class BlockManager( logWarning("Block " + blockId + " could not be dropped from memory as it does not exist") } if (info.tellMaster) { - reportBlockStatus(blockId) + reportBlockStatus(blockId, info) } if (!level.useDisk) { // The block is completely gone from this node; forget it so we can put() it again later. @@ -847,9 +841,7 @@ class BlockManager( } /** - * Remove a block from both memory and disk. This one doesn't report to the master - * because it expects the master to initiate the original block removal command, and - * then the master can update the block tracking itself. + * Remove a block from both memory and disk. */ def removeBlock(blockId: String) { logInfo("Removing block " + blockId) @@ -863,6 +855,9 @@ class BlockManager( "the disk or memory store") } blockInfo.remove(blockId) + if (info.tellMaster) { + reportBlockStatus(blockId, info) + } } else { // The block has already been removed; do nothing. logWarning("Asked to remove block " + blockId + ", which does not exist") @@ -872,7 +867,7 @@ class BlockManager( def dropOldBlocks(cleanupTime: Long) { logInfo("Dropping blocks older than " + cleanupTime) val iterator = blockInfo.internalMap.entrySet().iterator() - while(iterator.hasNext) { + while (iterator.hasNext) { val entry = iterator.next() val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2) if (time < cleanupTime) { @@ -887,7 +882,7 @@ class BlockManager( iterator.remove() logInfo("Dropped block " + id) } - reportBlockStatus(id) + reportBlockStatus(id, info) } } } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index cb582633c4f9f6a144db12e6e3967ac8f08e494b..a3d8671834dbc676c83e4a739c40462127a93e93 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -101,7 +101,7 @@ private[spark] class BlockManagerMaster( * blocks that the master knows about. */ def removeBlock(blockId: String) { - askMaster(RemoveBlock(blockId)) + askMasterWithRetry(RemoveBlock(blockId)) } /** @@ -130,21 +130,6 @@ private[spark] class BlockManagerMaster( } } - /** - * Send a message to the master actor and get its result within a default timeout, or - * throw a SparkException if this fails. There is no retry logic here so if the Akka - * message is lost, the master actor won't get the command. - */ - private def askMaster[T](message: Any): Any = { - try { - val future = masterActor.ask(message)(timeout) - return Await.result(future, timeout).asInstanceOf[T] - } catch { - case e: Exception => - throw new SparkException("Error communicating with BlockManagerMaster", e) - } - } - /** * Send a message to the master actor and get its result within a default timeout, or * throw a SparkException if this fails. diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 0a1be98d832858f5e2b7606b4e25875abbb80322..f4d026da3329c801775275899a7f2dd94136f6dc 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -28,7 +28,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]] // Mapping from block id to the set of block managers that have the block. - private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] + private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] initLogging() @@ -53,7 +53,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { register(blockManagerId, maxMemSize, slaveActor) case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => - blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) + updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) case GetLocations(blockId) => getLocations(blockId) @@ -108,10 +108,10 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { var iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next - val locations = blockInfo.get(blockId)._2 + val locations = blockLocations.get(blockId)._2 locations -= blockManagerId if (locations.size == 0) { - blockInfo.remove(locations) + blockLocations.remove(locations) } } } @@ -154,7 +154,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. private def removeBlock(blockId: String) { - val block = blockInfo.get(blockId) + val block = blockLocations.get(blockId) if (block != null) { block._2.foreach { blockManagerId: BlockManagerId => val blockManager = blockManagerInfo.get(blockManagerId) @@ -163,11 +163,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { // Doesn't actually wait for a confirmation and the message might get lost. // If message loss becomes frequent, we should add retry logic here. blockManager.get.slaveActor ! RemoveBlock(blockId) - // Remove the block from the master's BlockManagerInfo. - blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0) } } - blockInfo.remove(blockId) } sender ! true } @@ -202,7 +199,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { sender ! true } - private def blockUpdate( + private def updateBlockInfo( blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, @@ -232,21 +229,22 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) var locations: HashSet[BlockManagerId] = null - if (blockInfo.containsKey(blockId)) { - locations = blockInfo.get(blockId)._2 + if (blockLocations.containsKey(blockId)) { + locations = blockLocations.get(blockId)._2 } else { locations = new HashSet[BlockManagerId] - blockInfo.put(blockId, (storageLevel.replication, locations)) + blockLocations.put(blockId, (storageLevel.replication, locations)) } if (storageLevel.isValid) { - locations += blockManagerId + locations.add(blockManagerId) } else { locations.remove(blockManagerId) } + // Remove the block from master tracking if it has been removed on all slaves. if (locations.size == 0) { - blockInfo.remove(blockId) + blockLocations.remove(blockId) } sender ! true } @@ -254,9 +252,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { private def getLocations(blockId: String) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockId + " " - if (blockInfo.containsKey(blockId)) { + if (blockLocations.containsKey(blockId)) { var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - res.appendAll(blockInfo.get(blockId)._2) + res.appendAll(blockLocations.get(blockId)._2) sender ! res.toSeq } else { var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] @@ -267,9 +265,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { private def getLocationsMultipleBlockIds(blockIds: Array[String]) { def getLocations(blockId: String): Seq[BlockManagerId] = { val tmp = blockId - if (blockInfo.containsKey(blockId)) { + if (blockLocations.containsKey(blockId)) { var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - res.appendAll(blockInfo.get(blockId)._2) + res.appendAll(blockLocations.get(blockId)._2) return res.toSeq } else { var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 4e28a7e2bc8ed0e05b9197b4a9aa092857ff7f1a..8f86e3170ed0f1b0af2874621ac5221a822dbcea 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -7,6 +7,10 @@ import akka.actor._ import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.Timeouts._ +import org.scalatest.matchers.ShouldMatchers._ +import org.scalatest.time.SpanSugar._ import spark.KryoSerializer import spark.SizeEstimator @@ -142,37 +146,46 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) - // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) - store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) - store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false) + // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 + store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false) // Checking whether blocks are in memory and memory size - var memStatus = master.getMemoryStatus.head._2 + val memStatus = master.getMemoryStatus.head._2 assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000") assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200") - assert(store.getSingle("a1") != None, "a1 was not in store") - assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a1-to-remove") != None, "a1 was not in store") + assert(store.getSingle("a2-to-remove") != None, "a2 was not in store") + assert(store.getSingle("a3-to-remove") != None, "a3 was not in store") // Checking whether master knows about the blocks or not - assert(master.getLocations("a1").size > 0, "master was not told about a1") - assert(master.getLocations("a2").size > 0, "master was not told about a2") - assert(master.getLocations("a3").size === 0, "master was told about a3") + assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1") + assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2") + assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3") // Remove a1 and a2 and a3. Should be no-op for a3. - master.removeBlock("a1") - master.removeBlock("a2") - master.removeBlock("a3") - assert(store.getSingle("a1") === None, "a1 not removed from store") - assert(store.getSingle("a2") === None, "a2 not removed from store") - assert(master.getLocations("a1").size === 0, "master did not remove a1") - assert(master.getLocations("a2").size === 0, "master did not remove a2") - assert(store.getSingle("a3") != None, "a3 was not in store") - assert(master.getLocations("a3").size === 0, "master was told about a3") - memStatus = master.getMemoryStatus.head._2 - assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000") - assert(memStatus._2 == 2000L, "remaining memory " + memStatus._1 + " should equal 2000") + master.removeBlock("a1-to-remove") + master.removeBlock("a2-to-remove") + master.removeBlock("a3-to-remove") + + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("a1-to-remove") should be (None) + master.getLocations("a1-to-remove") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("a2-to-remove") should be (None) + master.getLocations("a2-to-remove") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("a3-to-remove") should not be (None) + master.getLocations("a3-to-remove") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + val memStatus = master.getMemoryStatus.head._2 + memStatus._1 should equal (2000L) + memStatus._2 should equal (2000L) + } } test("reregistration on heart beat") { diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 2f67bb9921e1c8dc9aa12e2a0b5212a8bbe2c599..34b93fb694478690f695099f936b4414ad386888 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -87,7 +87,7 @@ object SparkBuild extends Build { libraryDependencies ++= Seq( "org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011", - "org.scalatest" %% "scalatest" % "1.6.1" % "test", + "org.scalatest" %% "scalatest" % "1.8" % "test", "org.scalacheck" %% "scalacheck" % "1.9" % "test", "com.novocode" % "junit-interface" % "0.8" % "test" ),