Skip to content
Snippets Groups Projects
Commit d8d190ef authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #641 from mateiz/spark-1124-master

SPARK-1124: Fix infinite retries of reduce stage when a map stage failed

In the previous code, if you had a failing map stage and then tried to run reduce stages on it repeatedly, the first reduce stage would fail correctly, but the later ones would mistakenly believe that all map outputs are available and start failing infinitely with fetch failures from "null". See https://spark-project.atlassian.net/browse/SPARK-1124 for an example.

This PR also cleans up code style slightly where there was a variable named "s" and some weird map manipulation.
parents c0ef3afa 0187cef0
No related branches found
No related tags found
No related merge requests found
...@@ -272,8 +272,10 @@ class DAGScheduler( ...@@ -272,8 +272,10 @@ class DAGScheduler(
if (mapOutputTracker.has(shuffleDep.shuffleId)) { if (mapOutputTracker.has(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs) val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i)) for (i <- 0 until locs.size) {
stage.numAvailableOutputs = locs.size stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
}
stage.numAvailableOutputs = locs.count(_ != null)
} else { } else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here // Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown // since we can't do it in the RDD constructor because # of partitions is unknown
...@@ -373,25 +375,26 @@ class DAGScheduler( ...@@ -373,25 +375,26 @@ class DAGScheduler(
} else { } else {
def removeStage(stageId: Int) { def removeStage(stageId: Int) {
// data structures based on Stage // data structures based on Stage
stageIdToStage.get(stageId).foreach { s => for (stage <- stageIdToStage.get(stageId)) {
if (running.contains(s)) { if (running.contains(stage)) {
logDebug("Removing running stage %d".format(stageId)) logDebug("Removing running stage %d".format(stageId))
running -= s running -= stage
}
stageToInfos -= stage
for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
shuffleToMapStage.remove(k)
} }
stageToInfos -= s if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
shuffleToMapStage.remove(shuffleId))
if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
logDebug("Removing pending status for stage %d".format(stageId)) logDebug("Removing pending status for stage %d".format(stageId))
} }
pendingTasks -= s pendingTasks -= stage
if (waiting.contains(s)) { if (waiting.contains(stage)) {
logDebug("Removing stage %d from waiting set.".format(stageId)) logDebug("Removing stage %d from waiting set.".format(stageId))
waiting -= s waiting -= stage
} }
if (failed.contains(s)) { if (failed.contains(stage)) {
logDebug("Removing stage %d from failed set.".format(stageId)) logDebug("Removing stage %d from failed set.".format(stageId))
failed -= s failed -= stage
} }
} }
// data structures based on StageId // data structures based on StageId
......
...@@ -81,6 +81,19 @@ class FailureSuite extends FunSuite with LocalSparkContext { ...@@ -81,6 +81,19 @@ class FailureSuite extends FunSuite with LocalSparkContext {
FailureSuiteState.clear() FailureSuiteState.clear()
} }
// Run a map-reduce job in which the map stage always fails.
test("failure in a map stage") {
sc = new SparkContext("local", "test")
val data = sc.makeRDD(1 to 3).map(x => { throw new Exception; (x, x) }).groupByKey(3)
intercept[SparkException] {
data.collect()
}
// Make sure that running new jobs with the same map stage also fails
intercept[SparkException] {
data.collect()
}
}
test("failure because task results are not serializable") { test("failure because task results are not serializable") {
sc = new SparkContext("local[1,1]", "test") sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => new NonSerializable) val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment