Skip to content
Snippets Groups Projects
Commit b454d440 authored by Shubham Chopra's avatar Shubham Chopra Committed by Wenchen Fan
Browse files

[SPARK-15354][CORE] Topology aware block replication strategies

## What changes were proposed in this pull request?

Implementations of strategies for resilient block replication for different resource managers that replicate the 3-replica strategy used by HDFS, where the first replica is on an executor, the second replica within the same rack as the executor and a third replica on a different rack.
The implementation involves providing two pluggable classes, one running in the driver that provides topology information for every host at cluster start and the second prioritizing a list of peer BlockManagerIds.

The prioritization itself can be thought of an optimization problem to find a minimal set of peers that satisfy certain objectives and replicating to these peers first. The objectives can be used to express richer constraints over and above HDFS like 3-replica strategy.
## How was this patch tested?

This patch was tested with unit tests for storage, along with new unit tests to verify prioritization behaviour.

Author: Shubham Chopra <schopra31@bloomberg.net>

Closes #13932 from shubhamchopra/PrioritizerStrategy.
parent edc87d76
No related branches found
No related tags found
No related merge requests found
......@@ -49,7 +49,6 @@ import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
......@@ -1258,7 +1257,6 @@ private[spark] class BlockManager(
replication = 1)
val numPeersToReplicateTo = level.replication - 1
val startTime = System.nanoTime
var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
......@@ -1313,7 +1311,6 @@ private[spark] class BlockManager(
numPeersToReplicateTo - peersReplicatedTo.size)
}
}
logDebug(s"Replicating $blockId of ${data.size} bytes to " +
s"${peersReplicatedTo.size} peer(s) took ${(System.nanoTime - startTime) / 1e6} ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
......
......@@ -53,6 +53,46 @@ trait BlockReplicationPolicy {
numReplicas: Int): List[BlockManagerId]
}
object BlockReplicationUtils {
// scalastyle:off line.size.limit
/**
* Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
* minimizing space usage. Please see <a href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin">
* here</a>.
*
* @param n total number of indices
* @param m number of samples needed
* @param r random number generator
* @return list of m random unique indices
*/
// scalastyle:on line.size.limit
private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
val indices = (n - m + 1 to n).foldLeft(mutable.LinkedHashSet.empty[Int]) {case (set, i) =>
val t = r.nextInt(i) + 1
if (set.contains(t)) set + i else set + t
}
indices.map(_ - 1).toList
}
/**
* Get a random sample of size m from the elems
*
* @param elems
* @param m number of samples needed
* @param r random number generator
* @tparam T
* @return a random list of size m. If there are fewer than m elements in elems, we just
* randomly shuffle elems
*/
def getRandomSample[T](elems: Seq[T], m: Int, r: Random): List[T] = {
if (elems.size > m) {
getSampleIds(elems.size, m, r).map(elems(_))
} else {
r.shuffle(elems).toList
}
}
}
@DeveloperApi
class RandomBlockReplicationPolicy
extends BlockReplicationPolicy
......@@ -67,6 +107,7 @@ class RandomBlockReplicationPolicy
* @param peersReplicatedTo Set of peers already replicated to
* @param blockId BlockId of the block being replicated. This can be used as a source of
* randomness if needed.
* @param numReplicas Number of peers we need to replicate to
* @return A prioritized list of peers. Lower the index of a peer, higher its priority
*/
override def prioritize(
......@@ -78,7 +119,7 @@ class RandomBlockReplicationPolicy
val random = new Random(blockId.hashCode)
logDebug(s"Input peers : ${peers.mkString(", ")}")
val prioritizedPeers = if (peers.size > numReplicas) {
getSampleIds(peers.size, numReplicas, random).map(peers(_))
BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
} else {
if (peers.size < numReplicas) {
logWarning(s"Expecting ${numReplicas} replicas with only ${peers.size} peer/s.")
......@@ -88,26 +129,96 @@ class RandomBlockReplicationPolicy
logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
prioritizedPeers
}
}
@DeveloperApi
class BasicBlockReplicationPolicy
extends BlockReplicationPolicy
with Logging {
// scalastyle:off line.size.limit
/**
* Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
* minimizing space usage. Please see <a href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin">
* here</a>.
* Method to prioritize a bunch of candidate peers of a block manager. This implementation
* replicates the behavior of block replication in HDFS. For a given number of replicas needed,
* we choose a peer within the rack, one outside and remaining blockmanagers are chosen at
* random, in that order till we meet the number of replicas needed.
* This works best with a total replication factor of 3, like HDFS.
*
* @param n total number of indices
* @param m number of samples needed
* @param r random number generator
* @return list of m random unique indices
* @param blockManagerId Id of the current BlockManager for self identification
* @param peers A list of peers of a BlockManager
* @param peersReplicatedTo Set of peers already replicated to
* @param blockId BlockId of the block being replicated. This can be used as a source of
* randomness if needed.
* @param numReplicas Number of peers we need to replicate to
* @return A prioritized list of peers. Lower the index of a peer, higher its priority
*/
// scalastyle:on line.size.limit
private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i) =>
val t = r.nextInt(i) + 1
if (set.contains(t)) set + i else set + t
override def prioritize(
blockManagerId: BlockManagerId,
peers: Seq[BlockManagerId],
peersReplicatedTo: mutable.HashSet[BlockManagerId],
blockId: BlockId,
numReplicas: Int): List[BlockManagerId] = {
logDebug(s"Input peers : $peers")
logDebug(s"BlockManagerId : $blockManagerId")
val random = new Random(blockId.hashCode)
// if block doesn't have topology info, we can't do much, so we randomly shuffle
// if there is, we see what's needed from peersReplicatedTo and based on numReplicas,
// we choose whats needed
if (blockManagerId.topologyInfo.isEmpty || numReplicas == 0) {
// no topology info for the block. The best we can do is randomly choose peers
BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
} else {
// we have topology information, we see what is left to be done from peersReplicatedTo
val doneWithinRack = peersReplicatedTo.exists(_.topologyInfo == blockManagerId.topologyInfo)
val doneOutsideRack = peersReplicatedTo.exists { p =>
p.topologyInfo.isDefined && p.topologyInfo != blockManagerId.topologyInfo
}
if (doneOutsideRack && doneWithinRack) {
// we are done, we just return a random sample
BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
} else {
// we separate peers within and outside rack
val (inRackPeers, outOfRackPeers) = peers
.filter(_.host != blockManagerId.host)
.partition(_.topologyInfo == blockManagerId.topologyInfo)
val peerWithinRack = if (doneWithinRack) {
// we are done with in-rack replication, so don't need anymore peers
Seq.empty
} else {
if (inRackPeers.isEmpty) {
Seq.empty
} else {
Seq(inRackPeers(random.nextInt(inRackPeers.size)))
}
}
val peerOutsideRack = if (doneOutsideRack || numReplicas - peerWithinRack.size <= 0) {
Seq.empty
} else {
if (outOfRackPeers.isEmpty) {
Seq.empty
} else {
Seq(outOfRackPeers(random.nextInt(outOfRackPeers.size)))
}
}
val priorityPeers = peerWithinRack ++ peerOutsideRack
val numRemainingPeers = numReplicas - priorityPeers.size
val remainingPeers = if (numRemainingPeers > 0) {
val rPeers = peers.filter(p => !priorityPeers.contains(p))
BlockReplicationUtils.getRandomSample(rPeers, numRemainingPeers, random)
} else {
Seq.empty
}
(priorityPeers ++ remainingPeers).toList
}
}
// we shuffle the result to ensure a random arrangement within the sample
// to avoid any bias from set implementations
r.shuffle(indices.map(_ - 1).toList)
}
}
......@@ -28,6 +28,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
......@@ -36,6 +37,7 @@ import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.util.Utils
trait BlockManagerReplicationBehavior extends SparkFunSuite
with Matchers
......@@ -43,6 +45,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
with LocalSparkContext {
val conf: SparkConf
protected var rpcEnv: RpcEnv = null
protected var master: BlockManagerMaster = null
protected lazy val securityMgr = new SecurityManager(conf)
......@@ -55,7 +58,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
protected val allStores = new ArrayBuffer[BlockManager]
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
protected lazy val serializer = new KryoSerializer(conf)
// Implicitly convert strings to BlockIds for test clarity.
......@@ -471,7 +473,7 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
conf.set("spark.storage.replication.proactive", "true")
conf.set("spark.storage.exceptionOnPinLeak", "true")
(2 to 5).foreach{ i =>
(2 to 5).foreach { i =>
test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") {
testProactiveReplication(i)
}
......@@ -524,3 +526,30 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
}
}
}
class DummyTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
// number of racks to test with
val numRacks = 3
/**
* Gets the topology information given the host name
*
* @param hostname Hostname
* @return random topology
*/
override def getTopologyForHost(hostname: String): Option[String] = {
Some(s"/Rack-${Utils.random.nextInt(numRacks)}")
}
}
class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationBehavior {
val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test")
conf.set("spark.kryoserializer.buffer", "1m")
conf.set(
"spark.storage.replication.policy",
classOf[BasicBlockReplicationPolicy].getName)
conf.set(
"spark.storage.replication.topologyMapper",
classOf[DummyTopologyMapper].getName)
}
......@@ -18,34 +18,34 @@
package org.apache.spark.storage
import scala.collection.mutable
import scala.util.Random
import org.scalatest.{BeforeAndAfter, Matchers}
import org.apache.spark.{LocalSparkContext, SparkFunSuite}
class BlockReplicationPolicySuite extends SparkFunSuite
class RandomBlockReplicationPolicyBehavior extends SparkFunSuite
with Matchers
with BeforeAndAfter
with LocalSparkContext {
// Implicitly convert strings to BlockIds for test clarity.
private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
val replicationPolicy: BlockReplicationPolicy = new RandomBlockReplicationPolicy
val blockId = "test-block"
/**
* Test if we get the required number of peers when using random sampling from
* RandomBlockReplicationPolicy
* BlockReplicationPolicy
*/
test(s"block replication - random block replication policy") {
test("block replication - random block replication policy") {
val numBlockManagers = 10
val storeSize = 1000
val blockManagers = (1 to numBlockManagers).map { i =>
BlockManagerId(s"store-$i", "localhost", 1000 + i, None)
}
val blockManagers = generateBlockManagerIds(numBlockManagers, Seq("/Rack-1"))
val candidateBlockManager = BlockManagerId("test-store", "localhost", 1000, None)
val replicationPolicy = new RandomBlockReplicationPolicy
val blockId = "test-block"
(1 to 10).foreach {numReplicas =>
(1 to 10).foreach { numReplicas =>
logDebug(s"Num replicas : $numReplicas")
val randomPeers = replicationPolicy.prioritize(
candidateBlockManager,
......@@ -68,7 +68,60 @@ class BlockReplicationPolicySuite extends SparkFunSuite
logDebug(s"Random peers : ${secondPass.mkString(", ")}")
assert(secondPass.toSet.size === numReplicas)
}
}
protected def generateBlockManagerIds(count: Int, racks: Seq[String]): Seq[BlockManagerId] = {
(1 to count).map{i =>
BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(racks(Random.nextInt(racks.size))))
}
}
}
class TopologyAwareBlockReplicationPolicyBehavior extends RandomBlockReplicationPolicyBehavior {
override val replicationPolicy = new BasicBlockReplicationPolicy
test("All peers in the same rack") {
val racks = Seq("/default-rack")
val numBlockManager = 10
(1 to 10).foreach {numReplicas =>
val peers = generateBlockManagerIds(numBlockManager, racks)
val blockManager = BlockManagerId("Driver", "Host-driver", 10001, Some(racks.head))
val prioritizedPeers = replicationPolicy.prioritize(
blockManager,
peers,
mutable.HashSet.empty,
blockId,
numReplicas
)
assert(prioritizedPeers.toSet.size == numReplicas)
assert(prioritizedPeers.forall(p => p.host != blockManager.host))
}
}
test("Peers in 2 racks") {
val racks = Seq("/Rack-1", "/Rack-2")
(1 to 10).foreach {numReplicas =>
val peers = generateBlockManagerIds(10, racks)
val blockManager = BlockManagerId("Driver", "Host-driver", 9001, Some(racks.head))
val prioritizedPeers = replicationPolicy.prioritize(
blockManager,
peers,
mutable.HashSet.empty,
blockId,
numReplicas
)
assert(prioritizedPeers.toSet.size == numReplicas)
val priorityPeers = prioritizedPeers.take(2)
assert(priorityPeers.forall(p => p.host != blockManager.host))
if(numReplicas > 1) {
// both these conditions should be satisfied when numReplicas > 1
assert(priorityPeers.exists(p => p.topologyInfo == blockManager.topologyInfo))
assert(priorityPeers.exists(p => p.topologyInfo != blockManager.topologyInfo))
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment