Skip to content
Snippets Groups Projects
Commit cf52d9ca authored by Josh Rosen's avatar Josh Rosen
Browse files

Add try-finally to handle MapOutputTracker timeouts.

parent 8fd374df
No related branches found
No related tags found
No related merge requests found
......@@ -148,18 +148,23 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
val host = System.getProperty("spark.hostname", Utils.localHostName)
val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
val fetchedStatuses = deserializeStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
if (fetchedStatuses.contains(null)) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
// This try-finally prevents hangs due to timeouts:
var fetchedStatuses: Array[MapStatus] = null
try {
val fetchedBytes =
askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
fetchedStatuses = deserializeStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
if (fetchedStatuses.contains(null)) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
}
} finally {
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
}
return fetchedStatuses.map(s =>
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment