From cf52d9cade9a4df32a763073f7ad981465c91072 Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@eecs.berkeley.edu>
Date: Thu, 13 Dec 2012 21:53:30 -0800
Subject: [PATCH] Add try-finally to handle MapOutputTracker timeouts.

---
 .../main/scala/spark/MapOutputTracker.scala   | 29 +++++++++++--------
 1 file changed, 17 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 50c4183c0e..70eb9f702e 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -148,18 +148,23 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
       // We won the race to fetch the output locs; do so
       logInfo("Doing the fetch; tracker actor = " + trackerActor)
       val host = System.getProperty("spark.hostname", Utils.localHostName)
-      val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
-      val fetchedStatuses = deserializeStatuses(fetchedBytes)
-
-      logInfo("Got the output locations")
-      mapStatuses.put(shuffleId, fetchedStatuses)
-      fetching.synchronized {
-        fetching -= shuffleId
-        fetching.notifyAll()
-      }
-      if (fetchedStatuses.contains(null)) {
-        throw new FetchFailedException(null, shuffleId, -1, reduceId,
-          new Exception("Missing an output location for shuffle " + shuffleId))
+      // This try-finally prevents hangs due to timeouts:
+      var fetchedStatuses: Array[MapStatus] = null
+      try {
+        val fetchedBytes =
+          askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
+        fetchedStatuses = deserializeStatuses(fetchedBytes)
+        logInfo("Got the output locations")
+        mapStatuses.put(shuffleId, fetchedStatuses)
+        if (fetchedStatuses.contains(null)) {
+          throw new FetchFailedException(null, shuffleId, -1, reduceId,
+            new Exception("Missing an output location for shuffle " + shuffleId))
+        }
+      } finally {
+        fetching.synchronized {
+          fetching -= shuffleId
+          fetching.notifyAll()
+        }
       }
       return fetchedStatuses.map(s =>
         (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
-- 
GitLab