Skip to content
Snippets Groups Projects
Commit 548856a2 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge remote-tracking branch 'woggling/remove-machines'

Conflicts:
	core/src/main/scala/spark/scheduler/DAGScheduler.scala
parents 1dd82743 5c742229
No related branches found
No related tags found
No related merge requests found
...@@ -72,8 +72,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with ...@@ -72,8 +72,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val mapOutputTracker = env.mapOutputTracker val mapOutputTracker = env.mapOutputTracker
val blockManagerMaster = env.blockManager.master val blockManagerMaster = env.blockManager.master
val deadHosts = new HashSet[String] // TODO: The code currently assumes these can't come back; // For tracking failed nodes, we use the MapOutputTracker's generation number, which is
// that's not going to be a realistic assumption in general // sent with every task. When we detect a node failing, we note the current generation number
// and failed host, increment it for new tasks, and use this to ignore stray ShuffleMapTask
// results.
// TODO: Garbage collect information about failure generations when we know there are no more
// stray messages to detect.
val failedGeneration = new HashMap[String, Long]
val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
val running = new HashSet[Stage] // Stages we are running right now val running = new HashSet[Stage] // Stages we are running right now
...@@ -432,7 +437,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with ...@@ -432,7 +437,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val status = event.result.asInstanceOf[MapStatus] val status = event.result.asInstanceOf[MapStatus]
val host = status.address.ip val host = status.address.ip
logInfo("ShuffleMapTask finished with host " + host) logInfo("ShuffleMapTask finished with host " + host)
if (!deadHosts.contains(host)) { // TODO: Make sure hostnames are consistent with Mesos if (failedGeneration.contains(host) && smt.generation <= failedGeneration(host)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + host)
} else {
stage.addOutputLoc(smt.partition, status) stage.addOutputLoc(smt.partition, status)
} }
if (running.contains(stage) && pendingTasks(stage).isEmpty) { if (running.contains(stage) && pendingTasks(stage).isEmpty) {
...@@ -442,9 +449,16 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with ...@@ -442,9 +449,16 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
logInfo("waiting: " + waiting) logInfo("waiting: " + waiting)
logInfo("failed: " + failed) logInfo("failed: " + failed)
if (stage.shuffleDep != None) { if (stage.shuffleDep != None) {
// We supply true to increment the generation number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
// generation incremented to refetch them.
// TODO: Only increment the generation number if this is not the first time
// we registered these map outputs.
mapOutputTracker.registerMapOutputs( mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId, stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray) stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
true)
} }
clearCacheLocs() clearCacheLocs()
if (stage.outputLocs.count(_ == Nil) != 0) { if (stage.outputLocs.count(_ == Nil) != 0) {
...@@ -498,7 +512,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with ...@@ -498,7 +512,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
// TODO: mark the host as failed only if there were lots of fetch failures on it // TODO: mark the host as failed only if there were lots of fetch failures on it
if (bmAddress != null) { if (bmAddress != null) {
handleHostLost(bmAddress.ip) handleHostLost(bmAddress.ip, Some(task.generation))
} }
case other => case other =>
...@@ -510,11 +524,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with ...@@ -510,11 +524,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
/** /**
* Responds to a host being lost. This is called inside the event loop so it assumes that it can * Responds to a host being lost. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use hostLost() to post a host lost event from outside. * modify the scheduler's internal state. Use hostLost() to post a host lost event from outside.
*
* Optionally the generation during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
*/ */
def handleHostLost(host: String) { def handleHostLost(host: String, maybeGeneration: Option[Long] = None) {
if (!deadHosts.contains(host)) { val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration)
logInfo("Host lost: " + host) if (!failedGeneration.contains(host) || failedGeneration(host) < currentGeneration) {
deadHosts += host failedGeneration(host) = currentGeneration
logInfo("Host lost: " + host + " (generation " + currentGeneration + ")")
env.blockManager.master.notifyADeadHost(host) env.blockManager.master.notifyADeadHost(host)
// TODO: This will be really slow if we keep accumulating shuffle map stages // TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) { for ((shuffleId, stage) <- shuffleToMapStage) {
...@@ -522,7 +540,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with ...@@ -522,7 +540,13 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
mapOutputTracker.registerMapOutputs(shuffleId, locs, true) mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
} }
if (shuffleToMapStage.isEmpty) {
mapOutputTracker.incrementGeneration()
}
clearCacheLocs() clearCacheLocs()
} else {
logDebug("Additional host lost message for " + host +
"(generation " + currentGeneration + ")")
} }
} }
......
...@@ -188,4 +188,73 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter ...@@ -188,4 +188,73 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect() val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect()
assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE")) assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE"))
} }
test("recover from node failures") {
import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
DistributedSuite.amMaster = true
sc = new SparkContext(clusterUrl, "test")
val data = sc.parallelize(Seq(true, true), 2)
assert(data.count === 2) // force executors to start
val masterId = SparkEnv.get.blockManager.blockManagerId
assert(data.map(markNodeIfIdentity).collect.size === 2)
assert(data.map(failOnMarkedIdentity).collect.size === 2)
}
test("recover from repeated node failures during shuffle-map") {
import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
DistributedSuite.amMaster = true
sc = new SparkContext(clusterUrl, "test")
for (i <- 1 to 3) {
val data = sc.parallelize(Seq(true, false), 2)
assert(data.count === 2)
assert(data.map(markNodeIfIdentity).collect.size === 2)
assert(data.map(failOnMarkedIdentity).map(x => x -> x).groupByKey.count === 2)
}
}
test("recover from repeated node failures during shuffle-reduce") {
import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
DistributedSuite.amMaster = true
sc = new SparkContext(clusterUrl, "test")
for (i <- 1 to 3) {
val data = sc.parallelize(Seq(true, true), 2)
assert(data.count === 2)
assert(data.map(markNodeIfIdentity).collect.size === 2)
// This relies on mergeCombiners being used to perform the actual reduce for this
// test to actually be testing what it claims.
val grouped = data.map(x => x -> x).combineByKey(
x => x,
(x: Boolean, y: Boolean) => x,
(x: Boolean, y: Boolean) => failOnMarkedIdentity(x)
)
assert(grouped.collect.size === 1)
}
}
}
object DistributedSuite {
// Indicates whether this JVM is marked for failure.
var mark = false
// Set by test to remember if we are in the driver program so we can assert
// that we are not.
var amMaster = false
// Act like an identity function, but if the argument is true, set mark to true.
def markNodeIfIdentity(item: Boolean): Boolean = {
if (item) {
assert(!amMaster)
mark = true
}
item
}
// Act like an identity function, but if mark was set to true previously, fail,
// crashing the entire JVM.
def failOnMarkedIdentity(item: Boolean): Boolean = {
if (mark) {
System.exit(42)
}
item
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment