Skip to content
Snippets Groups Projects
Commit 7d71b9a5 authored by Josh Rosen's avatar Josh Rosen Committed by Matei Zaharia
Browse files

Fix NullPointerException caused by unregistered map outputs.

parent 6adc7c96
No related branches found
No related tags found
No related merge requests found
......@@ -156,6 +156,10 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
fetching -= shuffleId
fetching.notifyAll()
}
if (fetchedStatuses.contains(null)) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
}
return fetchedStatuses.map(s =>
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
} else {
......
......@@ -479,8 +479,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
") for resubmision due to a fetch failure")
// Mark the map whose fetch failed as broken in the map stage
val mapStage = shuffleToMapStage(shuffleId)
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.origin +
"); marking it for resubmission")
failed += mapStage
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment