diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 08d2956782b9c6d163870072e266a8f282d45850..b2c80d8eff8fad3a12f98a3f1a6acfb6e542be06 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -139,8 +139,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
               case e: InterruptedException =>
             }
           }
-          return mapStatuses.get(shuffleId).map(status =>
-            (status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId))))
+          return MapOutputTracker.convertMapStatuses(shuffleId, reduceId,
+                                                     mapStatuses.get(shuffleId))
         } else {
           fetching += shuffleId
         }
@@ -156,21 +156,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
         fetchedStatuses = deserializeStatuses(fetchedBytes)
         logInfo("Got the output locations")
         mapStatuses.put(shuffleId, fetchedStatuses)
-        if (fetchedStatuses.contains(null)) {
-          throw new FetchFailedException(null, shuffleId, -1, reduceId,
-            new Exception("Missing an output location for shuffle " + shuffleId))
-        }
       } finally {
         fetching.synchronized {
           fetching -= shuffleId
           fetching.notifyAll()
         }
       }
-      return fetchedStatuses.map(s =>
-        (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
+      return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
     } else {
-      return statuses.map(s =>
-        (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
+      return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
     }
   }
 
@@ -258,6 +252,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
 private[spark] object MapOutputTracker {
   private val LOG_BASE = 1.1
 
+  // Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
+  // any of the statuses is null (indicating a missing location due to a failed mapper),
+  // throw a FetchFailedException.
+  def convertMapStatuses(
+        shuffleId: Int,
+        reduceId: Int,
+        statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
+    if (statuses == null) {
+      throw new FetchFailedException(null, shuffleId, -1, reduceId,
+        new Exception("Missing all output locations for shuffle " + shuffleId))
+    }
+    statuses.map {
+      status => 
+        if (status == null) {
+          throw new FetchFailedException(null, shuffleId, -1, reduceId,
+            new Exception("Missing an output location for shuffle " + shuffleId))
+        } else {
+          (status.address, decompressSize(status.compressedSizes(reduceId)))
+        }
+    }
+  }
+
   /**
    * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
    * We do this by encoding the log base 1.1 of the size as an integer, which can support
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 0e7007459d58dcc2f2f8bf45443cd90c828e8dc6..aeed5d2f32a70f75b3984e5e57074750024ef693 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -134,7 +134,7 @@ private object Utils extends Logging {
    */
   def fetchFile(url: String, targetDir: File) {
     val filename = url.split("/").last
-    val tempDir = System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))
+    val tempDir = getLocalDir
     val tempFile =  File.createTempFile("fetchFileTemp", null, new File(tempDir))
     val targetFile = new File(targetDir, filename)
     val uri = new URI(url)
@@ -204,6 +204,15 @@ private object Utils extends Logging {
     FileUtil.chmod(filename, "a+x")
   }
 
+  /**
+   * Get a temporary directory using Spark's spark.local.dir property, if set. This will always
+   * return a single directory, even though the spark.local.dir property might be a list of
+   * multiple paths.
+   */
+  def getLocalDir: String = {
+    System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
+  }
+
   /**
    * Shuffle the elements of a collection into a random order, returning the
    * result in a new collection. Unlike scala.util.Random.shuffle, this method
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index 7eb4ddb74f98919baa31658379bd63674c9e2880..856a4683a95da04cb8a5e4a659e9b78b52a7cba0 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -89,7 +89,7 @@ private object HttpBroadcast extends Logging {
   }
 
   private def createServer() {
-    broadcastDir = Utils.createTempDir()
+    broadcastDir = Utils.createTempDir(Utils.getLocalDir)
     server = new HttpServer(broadcastDir)
     server.start()
     serverUri = server.uri
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 915f71ba9f12df99028d7e63452584f6fc0f16ce..a29bf974d247ec2ed2445b2e4845bf35d78b18d5 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -24,9 +24,6 @@ private[spark] class StandaloneExecutorBackend(
   with ExecutorBackend
   with Logging {
 
-  val threadPool = new ThreadPoolExecutor(
-    1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-
   var master: ActorRef = null
 
   override def preStart() {
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index e492279b4ec6444dfc793f0a0a20309c60b8c399..2aad7956b41c4414ea3a15f57fbeb5d1099039f5 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -15,9 +15,11 @@ private[spark] class ResultTask[T, U](
 
   override def run(attemptId: Long): U = {
     val context = new TaskContext(stageId, partition, attemptId)
-    val result = func(context, rdd.iterator(split, context))
-    context.executeOnCompleteCallbacks()
-    result
+    try {
+      func(context, rdd.iterator(split, context))
+    } finally {
+      context.executeOnCompleteCallbacks()
+    }
   }
 
   override def preferredLocations: Seq[String] = locs
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 5b4b19896046d204451ad33ad9fc8b0662c6d082..d3dd3a8fa4930cdc5dcf2cd8b656d5ce03cba272 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -1,12 +1,18 @@
 package spark
 
 import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
 
 import akka.actor._
 import spark.scheduler.MapStatus
 import spark.storage.BlockManagerId
+import spark.util.AkkaUtils
 
-class MapOutputTrackerSuite extends FunSuite {
+class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
+  after {
+    System.clearProperty("spark.master.port")
+  }
+ 
   test("compressSize") {
     assert(MapOutputTracker.compressSize(0L) === 0)
     assert(MapOutputTracker.compressSize(1L) === 1)
@@ -71,6 +77,36 @@ class MapOutputTrackerSuite extends FunSuite {
     // The remaining reduce task might try to grab the output dispite the shuffle failure;
     // this should cause it to fail, and the scheduler will ignore the failure due to the
     // stage already being aborted.
-    intercept[Exception] { tracker.getServerStatuses(10, 1) }
+    intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
+  }
+
+  test("remote fetch") {
+    System.clearProperty("spark.master.host")
+    val (actorSystem, boundPort) =
+      AkkaUtils.createActorSystem("test", "localhost", 0)
+    System.setProperty("spark.master.port", boundPort.toString)
+    val masterTracker = new MapOutputTracker(actorSystem, true)
+    val slaveTracker = new MapOutputTracker(actorSystem, false)
+    masterTracker.registerShuffle(10, 1)
+    masterTracker.incrementGeneration()
+    slaveTracker.updateGeneration(masterTracker.getGeneration)
+    intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+
+    val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+    val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+    masterTracker.registerMapOutput(10, 0, new MapStatus(
+      new BlockManagerId("hostA", 1000), Array(compressedSize1000)))
+    masterTracker.incrementGeneration()
+    slaveTracker.updateGeneration(masterTracker.getGeneration)
+    assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+           Seq((new BlockManagerId("hostA", 1000), size1000)))
+
+    masterTracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+    masterTracker.incrementGeneration()
+    slaveTracker.updateGeneration(masterTracker.getGeneration)
+    intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+
+    // failure should be cached
+    intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
   }
 }
diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f9378773406d309ff980cb9981458efba4469697
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
@@ -0,0 +1,43 @@
+package spark.scheduler
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import spark.TaskContext
+import spark.RDD
+import spark.SparkContext
+import spark.Split
+
+class TaskContextSuite extends FunSuite with BeforeAndAfter {
+
+  var sc: SparkContext = _
+
+  after {
+    if (sc != null) {
+      sc.stop()
+      sc = null
+    }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
+  }
+
+  test("Calls executeOnCompleteCallbacks after failure") {
+    var completed = false
+    sc = new SparkContext("local", "test")
+    val rdd = new RDD[String](sc) {
+      override val splits = Array[Split](StubSplit(0))
+      override val dependencies = List()
+      override def compute(split: Split, context: TaskContext) = {
+        context.addOnCompleteCallback(() => completed = true)
+        sys.error("failed")
+      }
+    }
+    val func = (c: TaskContext, i: Iterator[String]) => i.next
+    val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
+    intercept[RuntimeException] {
+      task.run(0)
+    }
+    assert(completed === true)
+  }
+
+  case class StubSplit(val index: Int) extends Split
+}
\ No newline at end of file
diff --git a/pyspark b/pyspark
index 9e89d51ba2125bb04fe079ae751be0db1ba345b0..ab7f4f50c01bd7f34b49a05e8bde24fa756e573d 100755
--- a/pyspark
+++ b/pyspark
@@ -6,6 +6,13 @@ FWDIR="$(cd `dirname $0`; pwd)"
 # Export this as SPARK_HOME
 export SPARK_HOME="$FWDIR"
 
+# Exit if the user hasn't compiled Spark
+if [ ! -e "$SPARK_HOME/repl/target" ]; then
+  echo "Failed to find Spark classes in $SPARK_HOME/repl/target" >&2
+  echo "You need to compile Spark before running this program" >&2
+  exit 1
+fi
+
 # Load environment variables from conf/spark-env.sh, if it exists
 if [ -e $FWDIR/conf/spark-env.sh ] ; then
   . $FWDIR/conf/spark-env.sh
diff --git a/run b/run
index ca234553863ae280c8101589c1bb775a762793df..eb93db66db310aeaaa76826befb4aa5c19954bb7 100755
--- a/run
+++ b/run
@@ -65,6 +65,13 @@ EXAMPLES_DIR="$FWDIR/examples"
 BAGEL_DIR="$FWDIR/bagel"
 PYSPARK_DIR="$FWDIR/python"
 
+# Exit if the user hasn't compiled Spark
+if [ ! -e "$REPL_DIR/target" ]; then
+  echo "Failed to find Spark classes in $REPL_DIR/target" >&2
+  echo "You need to compile Spark before running this program" >&2
+  exit 1
+fi
+
 # Build up classpath
 CLASSPATH="$SPARK_CLASSPATH"
 CLASSPATH+=":$FWDIR/conf"
diff --git a/run2.cmd b/run2.cmd
index 83464b1166f2289dd1e0a589afe2ce2f5182f851..67f1e465e47b37d0b1a63cc6d2a22e2993462147 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -1,6 +1,6 @@
 @echo off
 
-set SCALA_VERSION=2.9.1
+set SCALA_VERSION=2.9.2
 
 rem Figure out where the Spark framework is installed
 set FWDIR=%~dp0