diff --git a/bagel/pom.xml b/bagel/pom.xml
index 89282161eaabac5bd7dce6aedff93916f1b51fd7..b83a0ef6c0f375e9faab544b44e4731786ff9194 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <groupId>org.spark-project</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.8.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala
index e10c03f6bad1c8640143afc8808c90afb16ca7d6..5ecdd7d0045fae68116792eae13da43dbdf1bb39 100644
--- a/bagel/src/main/scala/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -7,8 +7,7 @@ import scala.collection.mutable.ArrayBuffer
 import storage.StorageLevel
 
 object Bagel extends Logging {
-
-  val DEFAULT_STORAGE_LEVEL  = StorageLevel.MEMORY_ONLY
+  val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK
 
   /**
    * Runs a Bagel program.
@@ -63,8 +62,9 @@ object Bagel extends Logging {
       val combinedMsgs = msgs.combineByKey(
         combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
       val grouped = combinedMsgs.groupWith(verts)
+      val superstep_ = superstep  // Create a read-only copy of superstep for capture in closure
       val (processed, numMsgs, numActiveVerts) =
-        comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel)
+        comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
 
       val timeTaken = System.currentTimeMillis - startTime
       logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 25db395c22128013d259aae8722d567e3c0ff76f..a09c97806869ef707a6b374a0c253d9f81410519 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -23,6 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
     }
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
   test("halting by voting") {
diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh
index 0c584055c794685255450b041ab6a8c7b9d9b893..8ee3ec481fe0b90005d6a65b89117ecb84171bd9 100755
--- a/bin/spark-daemon.sh
+++ b/bin/spark-daemon.sh
@@ -30,7 +30,7 @@
 #   SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
 ##
 
-usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <args...>"
+usage="Usage: spark-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <spark-command> <spark-instance-number> <args...>"
 
 # if no args specified, show usage
 if [ $# -le 1 ]; then
@@ -48,6 +48,8 @@ startStop=$1
 shift
 command=$1
 shift
+instance=$1
+shift
 
 spark_rotate_log ()
 {
@@ -92,10 +94,10 @@ if [ "$SPARK_PID_DIR" = "" ]; then
 fi
 
 # some variables
-export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.log
+export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.log
 export SPARK_ROOT_LOGGER="INFO,DRFA"
-log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$HOSTNAME.out
-pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command.pid
+log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out
+pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid
 
 # Set default scheduling priority
 if [ "$SPARK_NICENESS" = "" ]; then
diff --git a/bin/spark-daemons.sh b/bin/spark-daemons.sh
index 4f9719ee809e800bef0055681a433b350a6ca2b4..0619097e4dc3a512920e8f70b0cbffe2ab75c3d3 100755
--- a/bin/spark-daemons.sh
+++ b/bin/spark-daemons.sh
@@ -2,7 +2,7 @@
 
 # Run a Spark command on all slave hosts.
 
-usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command args..."
+usage="Usage: spark-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command instance-number args..."
 
 # if no args specified, show usage
 if [ $# -le 1 ]; then
diff --git a/bin/start-master.sh b/bin/start-master.sh
index 87feb261fe86bb498eedcf40c1d98b3773cf3576..83a3e1f3dc1a3caa04b83d60d9dd78f506db583c 100755
--- a/bin/start-master.sh
+++ b/bin/start-master.sh
@@ -32,4 +32,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then
     fi
 fi
 
-"$bin"/spark-daemon.sh start spark.deploy.master.Master --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
+"$bin"/spark-daemon.sh start spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
diff --git a/bin/start-slave.sh b/bin/start-slave.sh
index 45a0cf7a6b7ac526fb0651e6a3f12f4cbfca8b51..616c76e4ee6e01eecbcfaf241ec87bd7e9dc9554 100755
--- a/bin/start-slave.sh
+++ b/bin/start-slave.sh
@@ -11,4 +11,4 @@ if [ "$SPARK_PUBLIC_DNS" = "" ]; then
     fi
 fi
 
-"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1
+"$bin"/spark-daemon.sh start spark.deploy.worker.Worker "$@"
diff --git a/bin/start-slaves.sh b/bin/start-slaves.sh
index 390247ca4aa4916e0f53b001c6cb3927df02da65..4e05224190e3b5edd17d173578691fca03dd51fa 100755
--- a/bin/start-slaves.sh
+++ b/bin/start-slaves.sh
@@ -21,4 +21,13 @@ fi
 echo "Master IP: $SPARK_MASTER_IP"
 
 # Launch the slaves
-exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
+if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
+  exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" 1 spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
+else
+  if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
+    SPARK_WORKER_WEBUI_PORT=8081
+  fi
+  for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
+    "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" $(( $i + 1 ))  spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i ))
+  done
+fi
diff --git a/bin/stop-master.sh b/bin/stop-master.sh
index f75167dd2c72d9352140b47d6ae074850364a0c2..172ee5891d17f6d6fb76c9ff12c258ca14edcbff 100755
--- a/bin/stop-master.sh
+++ b/bin/stop-master.sh
@@ -7,4 +7,4 @@ bin=`cd "$bin"; pwd`
 
 . "$bin/spark-config.sh"
 
-"$bin"/spark-daemon.sh stop spark.deploy.master.Master
\ No newline at end of file
+"$bin"/spark-daemon.sh stop spark.deploy.master.Master 1
diff --git a/bin/stop-slaves.sh b/bin/stop-slaves.sh
index 21c9ebf324fdc69f6ab1680a75f8b5ed198d28ab..fbfc594472fe7522c30a6989e171aa4fa3396c7f 100755
--- a/bin/stop-slaves.sh
+++ b/bin/stop-slaves.sh
@@ -7,4 +7,14 @@ bin=`cd "$bin"; pwd`
 
 . "$bin/spark-config.sh"
 
-"$bin"/spark-daemons.sh stop spark.deploy.worker.Worker
\ No newline at end of file
+if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
+  . "${SPARK_CONF_DIR}/spark-env.sh"
+fi
+
+if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
+  "$bin"/spark-daemons.sh stop spark.deploy.worker.Worker 1
+else
+  for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
+    "$bin"/spark-daemons.sh stop spark.deploy.worker.Worker $(( $i + 1 ))
+  done
+fi
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index 6d71ec56914c3e1fa418ff069e1dbaa372bd6db2..37565ca827980d3d2b48312dfa30bb9d3fd10cea 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -12,6 +12,7 @@
 # - SPARK_WORKER_CORES, to set the number of cores to use on this machine
 # - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
 # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
+# - SPARK_WORKER_INSTANCES, to set the number of worker instances/processes to be spawned on every slave machine
 #
 # Finally, Spark also relies on the following variables, but these can be set
 # on just the *master* (i.e. in your driver program), and will automatically
diff --git a/core/pom.xml b/core/pom.xml
index 7f65ce5c004af3d09e429f0ad118e5cfda5018ca..da26d674ec02e782316bd302d8f0718242605c31 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <groupId>org.spark-project</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.8.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 33dc7627a3dc3636a32478f9c9b9defe9ee210a9..ccd9d0364ad9582bf319dda96fb57566a5da60ae 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -366,7 +366,7 @@ abstract class RDD[T: ClassManifest](
    * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
    * of the original partition.
    */
-  @deprecated("use mapPartitionsWithIndex")
+  @deprecated("use mapPartitionsWithIndex", "0.7.0")
   def mapPartitionsWithSplit[U: ClassManifest](
     f: (Int, Iterator[T]) => Iterator[U],
     preservesPartitioning: Boolean = false): RDD[U] =
diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala
index d00092e9845e2a888de7d7ca03ba23d90ae50ba9..57e0405fb4185a3bcd6336deb45f659170c1f2bb 100644
--- a/core/src/main/scala/spark/RDDCheckpointData.scala
+++ b/core/src/main/scala/spark/RDDCheckpointData.scala
@@ -1,6 +1,7 @@
 package spark
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
 import rdd.{CheckpointRDD, CoalescedRDD}
 import scheduler.{ResultTask, ShuffleMapTask}
 
@@ -62,14 +63,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
       }
     }
 
+    // Create the output path for the checkpoint
+    val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
+    val fs = path.getFileSystem(new Configuration())
+    if (!fs.mkdirs(path)) {
+      throw new SparkException("Failed to create checkpoint path " + path)
+    }
+
     // Save to file, and reload it as an RDD
-    val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id).toString
-    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _)
-    val newRDD = new CheckpointRDD[T](rdd.context, path)
+    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
+    val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
 
     // Change the dependencies and partitions of the RDD
     RDDCheckpointData.synchronized {
-      cpFile = Some(path)
+      cpFile = Some(path.toString)
       cpRDD = Some(newRDD)
       rdd.markCheckpointed(newRDD)   // Update the RDD's dependencies and partitions
       cpState = Checkpointed
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 6b4a11d6d3f9efc994ab0ccced9b529348542677..518034e07bf172ac96cd515b43543415c274cbf9 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -36,17 +36,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
     self: RDD[(K, V)])
   extends Logging
   with Serializable {
-  
+
   private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
     val c = {
-      if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { 
+      if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
         classManifest[T].erasure
       } else {
         // We get the type of the Writable class by looking at the apply method which converts
         // from T to Writable. Since we have two apply methods we filter out the one which
-        // is of the form "java.lang.Object apply(java.lang.Object)"
+        // is not of the form "java.lang.Object apply(java.lang.Object)"
         implicitly[T => Writable].getClass.getDeclaredMethods().filter(
-            m => m.getReturnType().toString != "java.lang.Object" &&
+            m => m.getReturnType().toString != "class java.lang.Object" &&
                  m.getName() == "apply")(0).getReturnType
 
       }
@@ -69,17 +69,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
     val valueClass = getWritableClass[V]
     val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
     val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
-  
-    logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) 
+
+    logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
     val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
     if (!convertKey && !convertValue) {
-      self.saveAsHadoopFile(path, keyClass, valueClass, format) 
+      self.saveAsHadoopFile(path, keyClass, valueClass, format)
     } else if (!convertKey && convertValue) {
-      self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) 
+      self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
     } else if (convertKey && !convertValue) {
-      self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format) 
+      self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
     } else if (convertKey && convertValue) {
-      self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) 
-    } 
+      self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
+    }
   }
 }
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 79d00edee78ecba5f7571e72022b20c61d9c5ecf..43ee39c993a3c0b254fe35bbbf9bcd8fb167e10c 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -131,6 +131,6 @@ private[spark] object CheckpointRDD extends Logging {
     val cpRDD = new CheckpointRDD[Int](sc, path.toString)
     assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
     assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
-    fs.delete(path)
+    fs.delete(path, true)
   }
 }
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 9213513e80914fd7ad67df953e80f04ddc1f07da..a6235491cab9657074a6de200d5b4cc3d5321341 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -29,7 +29,7 @@ private[spark] case class NarrowCoGroupSplitDep(
 private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
 
 private[spark]
-class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep])
+class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
   extends Partition with Serializable {
   override val index: Int = idx
   override def hashCode(): Int = idx
@@ -88,7 +88,7 @@ class CoGroupedRDD[K](
           case _ =>
             new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
         }
-      }.toList)
+      }.toArray)
     }
     array
   }
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 51f02409b6a75d689159970e59af52f887fc8626..4e33b7dd5ca844b4dc425b72fd1f09a2893e5e1d 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -16,7 +16,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
  * @tparam V the value class.
  */
 class ShuffledRDD[K, V](
-    prev: RDD[(K, V)],
+    @transient prev: RDD[(K, V)],
     part: Partitioner)
   extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part))) {
 
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
index 0a025610626779209c575caadbcd2067fb34a232..481e03b349af0d7681ef5d25d6f3ac07bfec69f5 100644
--- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -56,7 +56,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
           case _ =>
             new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
         }
-      }.toList)
+      }.toArray)
     }
     array
   }
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index e80ec17aa5e0a4bbffd94b835fb18cff597c1f00..35b0e06785cf370b04a718b4874cf2302bdf99c3 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -10,17 +10,17 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
     @transient rdd2: RDD[U]
   ) extends Partition {
 
-  var split1 = rdd1.partitions(idx)
-  var split2 = rdd1.partitions(idx)
+  var partition1 = rdd1.partitions(idx)
+  var partition2 = rdd2.partitions(idx)
   override val index: Int = idx
 
-  def splits = (split1, split2)
+  def partitions = (partition1, partition2)
 
   @throws(classOf[IOException])
   private def writeObject(oos: ObjectOutputStream) {
-    // Update the reference to parent split at the time of task serialization
-    split1 = rdd1.partitions(idx)
-    split2 = rdd2.partitions(idx)
+    // Update the reference to parent partition at the time of task serialization
+    partition1 = rdd1.partitions(idx)
+    partition2 = rdd2.partitions(idx)
     oos.defaultWriteObject()
   }
 }
@@ -43,13 +43,13 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
   }
 
   override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = {
-    val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits
-    rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context))
+    val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
+    rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context))
   }
 
   override def getPreferredLocations(s: Partition): Seq[String] = {
-    val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits
-    rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2))
+    val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
+    rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2))
   }
 
   override def clearDependencies() {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 3ce1e6e25779623732db2c3f13ebdb5033a56bd0..9b64f95df80731cd8188edb5a2a159410ccda153 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -121,7 +121,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     val toRemove = new HashSet[BlockManagerId]
     for (info <- blockManagerInfo.values) {
       if (info.lastSeenMs < minSeenTime) {
-        logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
+        logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
+          (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
         toRemove += info.blockManagerId
       }
     }
diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala
index ff00dd05dd7875e78f426b4e4f9b6bec20cfd079..76d5258b02b9712fae6d5a5935f6036a1a23fb44 100644
--- a/core/src/test/scala/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/spark/LocalSparkContext.scala
@@ -27,6 +27,7 @@ object LocalSparkContext {
     sc.stop()
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
   /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
@@ -38,4 +39,4 @@ object LocalSparkContext {
     }
   }
 
-}
\ No newline at end of file
+}
diff --git a/docs/_config.yml b/docs/_config.yml
index f99d5bb376027823a57749e6e5e07b4ac04a69f7..5c135a024215b0bfc697a938a592f7518f121428 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -3,8 +3,8 @@ markdown: kramdown
 
 # These allow the documentation to be updated with nerw releases
 # of Spark, Scala, and Mesos.
-SPARK_VERSION: 0.7.1-SNAPSHOT
-SPARK_VERSION_SHORT: 0.7.1
-SCALA_VERSION: 2.9.2
+SPARK_VERSION: 0.8.0-SNAPSHOT
+SPARK_VERSION_SHORT: 0.8.0
+SCALA_VERSION: 2.9.3
 MESOS_VERSION: 0.9.0-incubating
 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index c2eeafd07af751f4a8768e70b23e60ad0fad6a21..04cd79d039ec1b645f41400b7bf86d0cfa80475f 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -42,10 +42,10 @@ To run a specific test suite:
 
 You might run into the following errors if you're using a vanilla installation of Maven:
 
-    [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes...
+    [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes...
     [ERROR] PermGen space -> [Help 1]
 
-    [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes...
+    [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_VERSION}}/classes...
     [ERROR] Java heap space -> [Help 1]
 
 To fix these, you can do the following:
diff --git a/docs/index.md b/docs/index.md
index 51d505e1fa8c969397d969bc26fddd792f46f4aa..0c4add45dcd2adb6a5b4783a670afb985a6b8ce0 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -18,7 +18,7 @@ or you will need to set the `SCALA_HOME` environment variable to point
 to where you've installed Scala. Scala must also be accessible through one
 of these methods on slave nodes on your cluster.
 
-Spark uses [Simple Build Tool](https://github.com/harrah/xsbt/wiki), which is bundled with it. To compile the code, go into the top-level Spark directory and run
+Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run
 
     sbt/sbt package
 
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 5c80d2ed3a52eb0772c30b363c45fd4ee7e6d929..335643536aac959386cf8a915f5912d393a06d2f 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -53,8 +53,8 @@ scala> textFile.filter(line => line.contains("Spark")).count() // How many lines
 res3: Long = 15
 {% endhighlight %}
 
-## Transformations
-RDD transformations can be used for more complex computations. Let's say we want to find the line with the most words:
+## More On RDD Operations
+RDD actions and transformations can be used for more complex computations. Let's say we want to find the line with the most words:
 
 {% highlight scala %}
 scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
@@ -113,8 +113,8 @@ import SparkContext._
 
 object SimpleJob {
   def main(args: Array[String]) {
-    val logFile = "/var/log/syslog" // Should be some file on your system
-    val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
+    val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
+    val sc = new SparkContext("local", "Simple Job", "YOUR_SPARK_HOME",
       List("target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar"))
     val logData = sc.textFile(logFile, 2).cache()
     val numAs = logData.filter(line => line.contains("a")).count()
@@ -124,7 +124,7 @@ object SimpleJob {
 }
 {% endhighlight %}
 
-This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the job. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the job, the directory where Spark is installed, and a name for the jar file containing the job's sources. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes.
+This job simply counts the number of lines containing 'a' and the number containing 'b' in the Spark README. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the job. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the job, the directory where Spark is installed, and a name for the jar file containing the job's sources. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes.
 
 This file depends on the Spark API, so we'll also include an sbt configuration file, `simple.sbt` which explains that Spark is a dependency. This file also adds two repositories which host Spark dependencies:
 
@@ -156,7 +156,7 @@ $ find .
 $ sbt package
 $ sbt run
 ...
-Lines with a: 8422, Lines with b: 1836
+Lines with a: 46, Lines with b: 23
 {% endhighlight %}
 
 This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS.
@@ -173,7 +173,7 @@ import spark.api.java.function.Function;
 
 public class SimpleJob {
   public static void main(String[] args) {
-    String logFile = "/var/log/syslog"; // Should be some file on your system
+    String logFile = "$YOUR_SPARK_HOME/README.md"; // Should be some file on your system
     JavaSparkContext sc = new JavaSparkContext("local", "Simple Job",
       "$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"});
     JavaRDD<String> logData = sc.textFile(logFile).cache();
@@ -191,7 +191,7 @@ public class SimpleJob {
 }
 {% endhighlight %}
 
-This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Note that like in the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Java programming guide](java-programming-guide.html) describes these differences in more detail.
+This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Java programming guide](java-programming-guide.html) describes these differences in more detail.
 
 To build the job, we also write a Maven `pom.xml` file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version.
 
@@ -239,7 +239,7 @@ Now, we can execute the job using Maven:
 $ mvn package
 $ mvn exec:java -Dexec.mainClass="SimpleJob"
 ...
-Lines with a: 8422, Lines with b: 1836
+Lines with a: 46, Lines with b: 23
 {% endhighlight %}
 
 This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS.
@@ -253,7 +253,7 @@ As an example, we'll create a simple Spark job, `SimpleJob.py`:
 """SimpleJob.py"""
 from pyspark import SparkContext
 
-logFile = "/var/log/syslog"  # Should be some file on your system
+logFile = "$YOUR_SPARK_HOME/README.md"  # Should be some file on your system
 sc = SparkContext("local", "Simple job")
 logData = sc.textFile(logFile).cache()
 
@@ -265,7 +265,8 @@ print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
 
 
 This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file.
-Like in the Scala and Java examples, we use a SparkContext to create RDDs.
+Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. 
+As with the Scala and Java examples, we use a SparkContext to create RDDs.
 We can pass Python functions to Spark, which are automatically serialized along with any variables that they reference.
 For jobs that use custom classes or third-party libraries, we can add those code dependencies to SparkContext to ensure that they will be available on remote machines; this is described in more detail in the [Python programming guide](python-programming-guide.html).
 `SimpleJob` is simple enough that we do not need to specify any code dependencies.
@@ -276,7 +277,7 @@ We can run this job using the `pyspark` script:
 $ cd $SPARK_HOME
 $ ./pyspark SimpleJob.py
 ...
-Lines with a: 8422, Lines with b: 1836
+Lines with a: 46, Lines with b: 23
 {% endhighlight python %}
 
 This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS.
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index b30699cf3df8615b0e9360a751849aed76da3342..f5788dc46776d6709f1991195aefe2e742a35841 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -83,7 +83,7 @@ DStreams support many of the transformations available on normal Spark RDD's:
 <tr>
   <td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
   <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together all the values of each key in the RDDs of the source DStream. <br />
-  <b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
+  <b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
 </td>
 </tr>
 <tr>
@@ -132,7 +132,7 @@ Spark Streaming features windowed computations, which allow you to apply transfo
   <td> <b>groupByKeyAndWindow</b>(<i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>])
   </td>
   <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together values of each key over batches in a sliding window. <br />
-<b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td>
+<b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td>
 </tr>
 <tr>
   <td> <b>reduceByKeyAndWindow</b>(<i>func</i>, <i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>]) </td>
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 571d27fde66ed21d00aacdda1f587af381e5fefb..9f2daad2b632333b9dc288493d5ec52d908531db 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -540,11 +540,24 @@ def scp(host, opts, local_file, dest_file):
       (opts.identity_file, local_file, opts.user, host, dest_file), shell=True)
 
 
-# Run a command on a host through ssh, throwing an exception if ssh fails
+# Run a command on a host through ssh, retrying up to two times
+# and then throwing an exception if ssh continues to fail.
 def ssh(host, opts, command):
-  subprocess.check_call(
-      "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
-      (opts.identity_file, opts.user, host, command), shell=True)
+  tries = 0
+  while True:
+    try:
+      return subprocess.check_call(
+        "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
+        (opts.identity_file, opts.user, host, command), shell=True)
+    except subprocess.CalledProcessError as e:
+      if (tries > 2):
+        raise e
+      print "Error connecting to host {0}, sleeping 30".format(e)
+      time.sleep(30)
+      tries = tries + 1
+    
+    
+    
 
 
 # Gets a list of zones to launch instances in
diff --git a/examples/pom.xml b/examples/pom.xml
index 9594257ad40367fb69738ca0b8090938b63bb269..c42d2bcdb9ed55bcae303fa8844a8827e8a07ba1 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <groupId>org.spark-project</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.8.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -22,7 +22,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>algebird-core_2.9.2</artifactId>
-      <version>0.1.8</version>
+      <version>0.1.11</version>
     </dependency>
     <dependency>
       <groupId>org.scalatest</groupId>
diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala
index b07e799cef5afcfb855d64da4858326e17c4d8e7..4849f216fb2933faa17542897a584a70d88c2fda 100644
--- a/examples/src/main/scala/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala
@@ -10,73 +10,73 @@ import scala.collection.mutable.HashSet
  * K-means clustering.
  */
 object LocalKMeans {
-	val N = 1000
-	val R = 1000   	// Scaling factor
-	val D = 10
-	val K = 10
-	val convergeDist = 0.001
-	val rand = new Random(42)
-  	
-	def generateData = {
-	    def generatePoint(i: Int) = {
-	      Vector(D, _ => rand.nextDouble * R)
-	    }
-	    Array.tabulate(N)(generatePoint)
-	  }
-	
-	def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
-		var index = 0
-		var bestIndex = 0
-		var closest = Double.PositiveInfinity
-	
-		for (i <- 1 to centers.size) {
-			val vCurr = centers.get(i).get
-			val tempDist = p.squaredDist(vCurr)
-			if (tempDist < closest) {
-				closest = tempDist
-				bestIndex = i
-			}
-		}
-	
-		return bestIndex
-	}
-
-	def main(args: Array[String]) {
-	  val data = generateData
-		var points = new HashSet[Vector]
-		var kPoints = new HashMap[Int, Vector]
-		var tempDist = 1.0
-		
-		while (points.size < K) {
-			points.add(data(rand.nextInt(N)))
-		}
-		
-		val iter = points.iterator
-		for (i <- 1 to points.size) {
-			kPoints.put(i, iter.next())
-		}
-
-		println("Initial centers: " + kPoints)
-
-		while(tempDist > convergeDist) {
-			var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
-			
-			var mappings = closest.groupBy[Int] (x => x._1)
-			
-			var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
-			
-			var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
-			
-			tempDist = 0.0
-			for (mapping <- newPoints) {
-				tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
-			}
-			
-			for (newP <- newPoints) {
-				kPoints.put(newP._1, newP._2)
-			}
-		}
-
-		println("Final centers: " + kPoints)
-	}
+  val N = 1000
+  val R = 1000    // Scaling factor
+  val D = 10
+  val K = 10
+  val convergeDist = 0.001
+  val rand = new Random(42)
+
+  def generateData = {
+    def generatePoint(i: Int) = {
+      Vector(D, _ => rand.nextDouble * R)
+    }
+    Array.tabulate(N)(generatePoint)
+  }
+
+  def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
+    var index = 0
+    var bestIndex = 0
+    var closest = Double.PositiveInfinity
+
+    for (i <- 1 to centers.size) {
+      val vCurr = centers.get(i).get
+      val tempDist = p.squaredDist(vCurr)
+      if (tempDist < closest) {
+        closest = tempDist
+        bestIndex = i
+      }
+    }
+
+    return bestIndex
+  }
+
+  def main(args: Array[String]) {
+    val data = generateData
+    var points = new HashSet[Vector]
+    var kPoints = new HashMap[Int, Vector]
+    var tempDist = 1.0
+
+    while (points.size < K) {
+      points.add(data(rand.nextInt(N)))
+    }
+
+    val iter = points.iterator
+    for (i <- 1 to points.size) {
+      kPoints.put(i, iter.next())
+    }
+
+    println("Initial centers: " + kPoints)
+
+    while(tempDist > convergeDist) {
+      var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+
+      var mappings = closest.groupBy[Int] (x => x._1)
+
+      var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
+
+      var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
+
+      tempDist = 0.0
+      for (mapping <- newPoints) {
+        tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
+      }
+
+      for (newP <- newPoints) {
+        kPoints.put(newP._1, newP._2)
+      }
+    }
+
+    println("Final centers: " + kPoints)
+  }
 }
diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
index 92cd81c48742fb7fe1e7c598512d5bd91e0dc5ee..a0aaf609186b74813f010ea419465102ecdfd0d0 100644
--- a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
@@ -8,7 +8,7 @@ object MultiBroadcastTest {
       System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]")
       System.exit(1)
     }
-    
+
     val sc = new SparkContext(args(0), "Broadcast Test",
       System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
 
@@ -19,7 +19,7 @@ object MultiBroadcastTest {
     for (i <- 0 until arr1.length) {
       arr1(i) = i
     }
-    
+
     var arr2 = new Array[Int](num)
     for (i <- 0 until arr2.length) {
       arr2(i) = i
@@ -30,7 +30,7 @@ object MultiBroadcastTest {
     sc.parallelize(1 to 10, slices).foreach {
       i => println(barr1.value.size + barr2.value.size)
     }
-    
+
     System.exit(0)
   }
 }
diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
index 0d17bda004b6e2f1fc4d5b43f703dd11f97c79e8..461b84a2c66232d7a2c01e31b97b9ded377909a2 100644
--- a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
@@ -11,7 +11,7 @@ object SimpleSkewedGroupByTest {
         "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]")
       System.exit(1)
     }  
-    
+
     var numMappers = if (args.length > 1) args(1).toInt else 2
     var numKVPairs = if (args.length > 2) args(2).toInt else 1000
     var valSize = if (args.length > 3) args(3).toInt else 1000
@@ -20,7 +20,7 @@ object SimpleSkewedGroupByTest {
 
     val sc = new SparkContext(args(0), "GroupBy Test",
       System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
-    
+
     val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
       val ranGen = new Random
       var result = new Array[(Int, Array[Byte])](numKVPairs)
diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
index 83be3fc27b5b0a5c3ad2549f0b443136417d55a6..435675f9de489d65988fded440e91084a820cdf0 100644
--- a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
@@ -10,7 +10,7 @@ object SkewedGroupByTest {
       System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
       System.exit(1)
     }  
-    
+
     var numMappers = if (args.length > 1) args(1).toInt else 2
     var numKVPairs = if (args.length > 2) args(2).toInt else 1000
     var valSize = if (args.length > 3) args(3).toInt else 1000
@@ -18,7 +18,7 @@ object SkewedGroupByTest {
 
     val sc = new SparkContext(args(0), "GroupBy Test",
       System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
-    
+
     val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
       val ranGen = new Random
 
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 483aae452b05ef5a0ef32903cdf53bae41e1a7cb..a9642100e3d4886acd337e82e9ca6af049197410 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -49,7 +49,7 @@ object TwitterAlgebirdCMS {
 
     val users = stream.map(status => status.getUser.getId)
 
-    val cms = new CountMinSketchMonoid(DELTA, EPS, SEED, PERC)
+    val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
     var globalCMS = cms.zero
     val mm = new MapMonoid[Long, Int]()
     var globalExact = Map[Long, Int]()
diff --git a/pom.xml b/pom.xml
index 12e310a03838fe855f3ab583b03e4e3efa23340f..c3323ffad0f71dbe9b84efac787b6179bdd34c7b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.spark-project</groupId>
   <artifactId>spark-parent</artifactId>
-  <version>0.7.1-SNAPSHOT</version>
+  <version>0.8.0-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>Spark Project Parent POM</name>
   <url>http://spark-project.org/</url>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 947ac47f6bb6220bd9c099634a63211ae3ca1bd0..7c004df6fb8b23a30ce4fcff5d5a37cee6ed27f3 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -48,12 +48,13 @@ object SparkBuild extends Build {
     scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"),
     unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
     retrieveManaged := true,
+    retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
     transitiveClassifiers in Scope.GlobalScope := Seq("sources"),
     testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))),
 
     // Fork new JVMs for tests and set Java options for those
     fork := true,
-    javaOptions += "-Xmx1g",
+    javaOptions += "-Xmx2g",
 
     // Shared between both core and streaming.
     resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"),
@@ -125,6 +126,9 @@ object SparkBuild extends Build {
 
   val slf4jVersion = "1.6.1"
 
+  val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson")
+  val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
+
   def coreSettings = sharedSettings ++ Seq(
     name := "spark-core",
     resolvers ++= Seq(
@@ -145,33 +149,33 @@ object SparkBuild extends Build {
       "asm" % "asm-all" % "3.3.1",
       "com.google.protobuf" % "protobuf-java" % "2.4.1",
       "de.javakaffee" % "kryo-serializers" % "0.22",
-      "com.typesafe.akka" % "akka-actor" % "2.0.3",
-      "com.typesafe.akka" % "akka-remote" % "2.0.3",
-      "com.typesafe.akka" % "akka-slf4j" % "2.0.3",
+      "com.typesafe.akka" % "akka-actor" % "2.0.3" excludeAll(excludeNetty),
+      "com.typesafe.akka" % "akka-remote" % "2.0.3" excludeAll(excludeNetty),
+      "com.typesafe.akka" % "akka-slf4j" % "2.0.3" excludeAll(excludeNetty),
       "it.unimi.dsi" % "fastutil" % "6.4.4",
       "colt" % "colt" % "1.2.0",
-      "cc.spray" % "spray-can" % "1.0-M2.1",
-      "cc.spray" % "spray-server" % "1.0-M2.1",
-      "cc.spray" % "spray-json_2.9.2" % "1.1.1",
+      "cc.spray" % "spray-can" % "1.0-M2.1" excludeAll(excludeNetty),
+      "cc.spray" % "spray-server" % "1.0-M2.1" excludeAll(excludeNetty),
+      "cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty),
       "org.apache.mesos" % "mesos" % "0.9.0-incubating"
     ) ++ (
       if (HADOOP_MAJOR_VERSION == "2") {
         if (HADOOP_YARN) {
           Seq(
             // Exclude rule required for all ?
-            "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ),
-            "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ),
-            "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ),
-            "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") )
+            "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty),
+            "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty),
+            "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty),
+            "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty)
           )
         } else {
           Seq(
-            "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ),
-            "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") )
+            "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty),
+            "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty)
           )
         }
       } else {
-        Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll( ExclusionRule(organization = "org.codehaus.jackson") ) )
+        Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) )
       }),
     unmanagedSourceDirectories in Compile <+= baseDirectory{ _ /
       ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") {
@@ -201,16 +205,17 @@ object SparkBuild extends Build {
   def streamingSettings = sharedSettings ++ Seq(
     name := "spark-streaming",
     libraryDependencies ++= Seq(
-      "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
+      "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty),
       "com.github.sgroschupf" % "zkclient" % "0.1",
-      "org.twitter4j" % "twitter4j-stream" % "3.0.3",
-      "com.typesafe.akka" % "akka-zeromq" % "2.0.3"
+      "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
+      "com.typesafe.akka" % "akka-zeromq" % "2.0.3" excludeAll(excludeNetty)
     )
   ) ++ assemblySettings ++ extraAssemblySettings
 
   def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
     mergeStrategy in assembly := {
       case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
+      case m if m.toLowerCase.matches("meta-inf/.*\\.sf$") => MergeStrategy.discard
       case "reference.conf" => MergeStrategy.concat
       case _ => MergeStrategy.first
     }
diff --git a/project/build.properties b/project/build.properties
index d4287112c6afb76c00419432dbc7aa79945f09ee..9b860e23c51a6a794509c549acde3ce9ab007233 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -1 +1 @@
-sbt.version=0.11.3
+sbt.version=0.12.3
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 4d0e696a113581cad40119265e1e86f67ec0ab9a..d4f244287236343a0d475849bb3d1ff4c3b6d152 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -4,13 +4,13 @@ resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/release
 
 resolvers += "Spray Repository" at "http://repo.spray.cc/"
 
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.3")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
 
-addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1")
+addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1")
 
-addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0")
+addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")
 
-addSbtPlugin("cc.spray" %% "sbt-twirl" % "0.5.2")
+addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1")
 
 // For Sonatype publishing
 //resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py
index 73f7f8fbafcc5d577be823691187bc51940c9a18..7f85a1008e9f440660afb1bffc37ddaaa44849a4 100644
--- a/python/examples/transitive_closure.py
+++ b/python/examples/transitive_closure.py
@@ -24,7 +24,7 @@ if __name__ == "__main__":
             "Usage: PythonTC <master> [<slices>]"
         exit(-1)
     sc = SparkContext(sys.argv[1], "PythonTC")
-    slices = sys.argv[2] if len(sys.argv) > 2 else 2
+    slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
     tc = sc.parallelize(generateGraph(), slices).cache()
 
     # Linear transitive closure: each round grows paths by one edge,
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 172ed85fab9267b7cfffbec8c0474701c9faf23f..a9fec17a9da69fb4ce1f7153d4ee31e60a6cb2f8 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -399,7 +399,7 @@ class RDD(object):
         >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
         >>> from fileinput import input
         >>> from glob import glob
-        >>> ''.join(input(glob(tempFile.name + "/part-0000*")))
+        >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
         '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
         """
         def func(split, iterator):
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index 46f38c277231f0f1a1430bce8debaac5c557aad6..7a7280313edb7ab9f1bc5bb218198cf78305e799 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <groupId>org.spark-project</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.8.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/repl/pom.xml b/repl/pom.xml
index 1f885673f463a2a780a911a590d66ce8c24d8af2..038da5d9881c6fef98fc7cbe76137b66b0031123 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <groupId>org.spark-project</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.8.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala
index cd7b5128b24df21611df32598db2d2cc95b74b03..23556dbc8f3b9388c4ce99a080b6785ef844c501 100644
--- a/repl/src/main/scala/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/spark/repl/SparkILoop.scala
@@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
       ____              __  
      / __/__  ___ _____/ /__
     _\ \/ _ \/ _ `/ __/  '_/
-   /___/ .__/\_,_/_/ /_/\_\   version 0.7.1
+   /___/ .__/\_,_/_/ /_/\_\   version 0.8.0
       /_/                  
 """)
     import Properties._
diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala
index 43559b96d3a807a220053264cc3f34fb68af0c07..1c64f9b98d099d618eedffc559f4cfc7114bfa1f 100644
--- a/repl/src/test/scala/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/spark/repl/ReplSuite.scala
@@ -32,6 +32,7 @@ class ReplSuite extends FunSuite {
       interp.sparkContext.stop()
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
     return out.toString
   }
   
diff --git a/run b/run
index 2c29cc4a6641cd6f11f80f5485b34ab27528006e..756f8703f2502541cf4c93a254b0f4d60577feb3 100755
--- a/run
+++ b/run
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-SCALA_VERSION=2.9.2
+SCALA_VERSION=2.9.3
 
 # Figure out where the Scala framework is installed
 FWDIR="$(cd `dirname $0`; pwd)"
@@ -22,6 +22,7 @@ fi
 # values for that; it doesn't need a lot
 if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
   SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
+  SPARK_DAEMON_JAVA_OPTS+=" -Dspark.akka.logLifecycleEvents=true"
   SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS   # Empty by default
 fi
 
@@ -46,14 +47,15 @@ case "$1" in
 esac
 
 if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
-  if [ `command -v scala` ]; then
-    RUNNER="scala"
+  if [ "$SCALA_HOME" ]; then
+    RUNNER="${SCALA_HOME}/bin/scala"
   else
-    if [ -z "$SCALA_HOME" ]; then
-      echo "SCALA_HOME is not set" >&2
+    if [ `command -v scala` ]; then
+      RUNNER="scala"
+    else
+      echo "SCALA_HOME is not set and scala is not in PATH" >&2
       exit 1
     fi
-    RUNNER="${SCALA_HOME}/bin/scala"
   fi
 else
   if [ `command -v java` ]; then
diff --git a/run2.cmd b/run2.cmd
index cb20a4b7a2f9a228cf66febbba429e5d39ecb60b..d2d4807971d295cd131651aab8a6de49dc0d88c6 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -1,6 +1,6 @@
 @echo off
 
-set SCALA_VERSION=2.9.2
+set SCALA_VERSION=2.9.3
 
 rem Figure out where the Spark framework is installed
 set FWDIR=%~dp0
@@ -21,6 +21,7 @@ set RUNNING_DAEMON=0
 if "%1"=="spark.deploy.master.Master" set RUNNING_DAEMON=1
 if "%1"=="spark.deploy.worker.Worker" set RUNNING_DAEMON=1
 if "x%SPARK_DAEMON_MEMORY%" == "x" set SPARK_DAEMON_MEMORY=512m
+set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
 if "%RUNNING_DAEMON%"=="1" set SPARK_MEM=%SPARK_DAEMON_MEMORY%
 if "%RUNNING_DAEMON%"=="1" set SPARK_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS%
 
diff --git a/sbt/sbt b/sbt/sbt
index 8f426d18e892facbc84fe1fe47edc8bc3a0f24ea..850c58e1e9745db4833f58748df11943c4a2b5f0 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -5,4 +5,4 @@ if [ "$MESOS_HOME" != "" ]; then
 fi
 export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd)
 export SPARK_TESTING=1  # To put test classes on classpath
-java -Xmx1200M -XX:MaxPermSize=250m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
+java -Xmx1200m -XX:MaxPermSize=250m -XX:ReservedCodeCacheSize=128m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
diff --git a/streaming/pom.xml b/streaming/pom.xml
index fc2e211a42eed41cec139e44e679387f42369ee2..08ff3e2ae12f48480d8d0526e6131254d127031e 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -4,7 +4,7 @@
   <parent>
     <groupId>org.spark-project</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.8.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index e303e33e5e4014e7b252b491edc9d5090dc5e88f..7bd104b8d5b9447ed2fe50503d2b1760d3ddd7c3 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -38,28 +38,43 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
 private[streaming]
 class CheckpointWriter(checkpointDir: String) extends Logging {
   val file = new Path(checkpointDir, "graph")
+  // The file to which we actually write - and then "move" to file.
+  private val writeFile = new Path(file.getParent, file.getName + ".next")
+  private val bakFile = new Path(file.getParent, file.getName + ".bk")
+
+  @volatile private var stopped = false
+
   val conf = new Configuration()
   var fs = file.getFileSystem(conf)
   val maxAttempts = 3
   val executor = Executors.newFixedThreadPool(1)
 
+  // Removed code which validates whether there is only one CheckpointWriter per path 'file' since 
+  // I did not notice any errors - reintroduce it ?
+
   class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
     def run() {
       var attempts = 0
       val startTime = System.currentTimeMillis()
       while (attempts < maxAttempts) {
+        if (stopped) {
+          logInfo("Already stopped, ignore checkpoint attempt for " + file)
+          return
+        }
         attempts += 1
         try {
           logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
-          if (fs.exists(file)) {
-            val bkFile = new Path(file.getParent, file.getName + ".bk")
-            FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
-            logDebug("Moved existing checkpoint file to " + bkFile)
-          }
-          val fos = fs.create(file)
+          // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
+          val fos = fs.create(writeFile)
           fos.write(bytes)
           fos.close()
-          fos.close()
+          if (fs.exists(file) && fs.rename(file, bakFile)) {
+            logDebug("Moved existing checkpoint file to " + bakFile)
+          }
+          // paranoia
+          fs.delete(file, false)
+          fs.rename(writeFile, file)
+
           val finishTime = System.currentTimeMillis();
           logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
             "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
@@ -84,6 +99,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
   }
 
   def stop() {
+    stopped = true
     executor.shutdown()
   }
 }
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index f673e5be15485b839d5f894912d3290eecc21637..e7a3f92bc0bb5e890bc94a90f8c5d0d0981c10a6 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -74,6 +74,7 @@ object MasterFailureTest extends Logging {
 
     val operation = (st: DStream[String]) => {
       val updateFunc = (values: Seq[Long], state: Option[Long]) => {
+        logInfo("UpdateFunc .. state = " + state.getOrElse(0L) + ", values = " + values)
         Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L))
       }
       st.flatMap(_.split(" "))
@@ -159,6 +160,7 @@ object MasterFailureTest extends Logging {
 
     // Setup the streaming computation with the given operation
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
     var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
     ssc.checkpoint(checkpointDir.toString)
     val inputStream = ssc.textFileStream(testDir.toString)
@@ -205,6 +207,7 @@ object MasterFailureTest extends Logging {
         // (iii) Its not timed out yet
         System.clearProperty("spark.streaming.clock")
         System.clearProperty("spark.driver.port")
+        System.clearProperty("spark.hostPort")
         ssc.start()
         val startTime = System.currentTimeMillis()
         while (!killed && !isLastOutputGenerated && !isTimedOut) {
@@ -357,13 +360,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
         // Write the data to a local file and then move it to the target test directory
         val localFile = new File(localTestDir, (i+1).toString)
         val hadoopFile = new Path(testDir, (i+1).toString)
+        val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
         FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
         var tries = 0
 	var done = false
         while (!done && tries < maxTries) {
           tries += 1
           try {
-            fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+            // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+            fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
+            fs.rename(tempHadoopFile, hadoopFile)
 	    done = true
 	  } catch {
 	    case ioe: IOException => { 
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index cf2ed8b1d4bef11018e27785702cb44254527188..e7352deb81da3a140710d8fb0c223e50121ed7b1 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -15,6 +15,7 @@ class BasicOperationsSuite extends TestSuiteBase {
   after {
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
   test("map") {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index cac86deeaf3492cb298c26d9812cc82e71fd83cf..607dea77ec24b6bfd90d59571b702b4933239f91 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -31,6 +31,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
 
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
   var ssc: StreamingContext = null
@@ -325,6 +326,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     )
     ssc = new StreamingContext(checkpointDir)
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
     ssc.start()
     val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
     // the first element will be re-processed data of the last batch before restart
@@ -350,4 +352,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
     outputStream.output
   }
-}
\ No newline at end of file
+}
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index a5fa7ab92dd6bf9564e1f33aba39d82a51deda04..4529e774e9b0a6abe5c59bd581abd20770e04bb4 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -22,10 +22,12 @@ class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
   val batchDuration = Milliseconds(1000)
 
   before {
+    logInfo("BEFORE ...")
     FileUtils.deleteDirectory(new File(directory))
   }
 
   after {
+    logInfo("AFTER ...")
     FileUtils.deleteDirectory(new File(directory))
   }
 
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 67dca2ac3121596d0b012ec0e749880a6db94e90..0acb6db6f2e484c213ea0bc15d291fb6f8a2dab6 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -41,6 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
   after {
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
 
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 1b66f3bda20ad0f112295e529f4a3f1419c167ed..80d827706f63c2cfdee90db2fe6f1bb5d114dd41 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase {
   after {
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
   val largerSlideInput = Seq(