From a250933c625ed720d15a0e479e9c51113605b102 Mon Sep 17 00:00:00 2001 From: Shubham Chopra <schopra31@bloomberg.net> Date: Tue, 28 Mar 2017 09:47:29 +0800 Subject: [PATCH] [SPARK-19803][CORE][TEST] Proactive replication test failures ## What changes were proposed in this pull request? Executors cache a list of their peers that is refreshed by default every minute. The cached stale references were randomly being used for replication. Since those executors were removed from the master, they did not occur in the block locations as reported by the master. This was fixed by 1. Refreshing peer cache in the block manager before trying to pro-actively replicate. This way the probability of replicating to a failed executor is eliminated. 2. Explicitly stopping the block manager in the tests. This shuts down the RPC endpoint use by the block manager. This way, even if a block manager tries to replicate using a stale reference, the replication logic should take care of refreshing the list of peers after failure. ## How was this patch tested? Tested manually Author: Shubham Chopra <schopra31@bloomberg.net> Author: Kay Ousterhout <kayousterhout@gmail.com> Author: Shubham Chopra <shubhamchopra@users.noreply.github.com> Closes #17325 from shubhamchopra/SPARK-19803. --- .../spark/storage/BlockInfoManager.scala | 6 ++++ .../apache/spark/storage/BlockManager.scala | 6 +++- .../BlockManagerReplicationSuite.scala | 29 ++++++++++++------- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 490d45d12b..3db59837fb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -371,6 +371,12 @@ private[storage] class BlockInfoManager extends Logging { blocksWithReleasedLocks } + /** Returns the number of locks held by the given task. Used only for testing. */ + private[storage] def getTaskLockCount(taskAttemptId: TaskAttemptId): Int = { + readLocksByTask.get(taskAttemptId).map(_.size()).getOrElse(0) + + writeLocksByTask.get(taskAttemptId).map(_.size).getOrElse(0) + } + /** * Returns the number of blocks tracked. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 245d94ac4f..991346a40a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1187,7 +1187,7 @@ private[spark] class BlockManager( blockId: BlockId, existingReplicas: Set[BlockManagerId], maxReplicas: Int): Unit = { - logInfo(s"Pro-actively replicating $blockId") + logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") blockInfoManager.lockForReading(blockId).foreach { info => val data = doGetLocalBytes(blockId, info) val storageLevel = StorageLevel( @@ -1196,9 +1196,13 @@ private[spark] class BlockManager( useOffHeap = info.level.useOffHeap, deserialized = info.level.deserialized, replication = maxReplicas) + // we know we are called as a result of an executor removal, so we refresh peer cache + // this way, we won't try to replicate to a missing executor with a stale reference + getPeers(forceFetch = true) try { replicate(blockId, data, storageLevel, info.classTag, existingReplicas) } finally { + logDebug(s"Releasing lock for $blockId") releaseLock(blockId) } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index d907add920..d5715f8469 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -493,27 +493,34 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav assert(blockLocations.size === replicationFactor) // remove a random blockManager - val executorsToRemove = blockLocations.take(replicationFactor - 1) + val executorsToRemove = blockLocations.take(replicationFactor - 1).toSet logInfo(s"Removing $executorsToRemove") - executorsToRemove.foreach{exec => - master.removeExecutor(exec.executorId) + initialStores.filter(bm => executorsToRemove.contains(bm.blockManagerId)).foreach { bm => + master.removeExecutor(bm.blockManagerId.executorId) + bm.stop() // giving enough time for replication to happen and new block be reported to master - Thread.sleep(200) + eventually(timeout(5 seconds), interval(100 millis)) { + val newLocations = master.getLocations(blockId).toSet + assert(newLocations.size === replicationFactor) + } } - val newLocations = eventually(timeout(5 seconds), interval(10 millis)) { + val newLocations = eventually(timeout(5 seconds), interval(100 millis)) { val _newLocations = master.getLocations(blockId).toSet assert(_newLocations.size === replicationFactor) _newLocations } logInfo(s"New locations : $newLocations") - // there should only be one common block manager between initial and new locations - assert(newLocations.intersect(blockLocations.toSet).size === 1) - // check if all the read locks have been released - initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm => - val locks = bm.releaseAllLocksForTask(BlockInfo.NON_TASK_WRITER) - assert(locks.size === 0, "Read locks unreleased!") + // new locations should not contain stopped block managers + assert(newLocations.forall(bmId => !executorsToRemove.contains(bmId)), + "New locations contain stopped block managers.") + + // Make sure all locks have been released. + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm => + assert(bm.blockInfoManager.getTaskLockCount(BlockInfo.NON_TASK_WRITER) === 0) + } } } } -- GitLab