diff --git a/assembly/pom.xml b/assembly/pom.xml
index 808a829e192615f3fc2666fcca3576951c97cf52..d62332137a78a0e6934c3dc76a5c825de0ab8361 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.8.0-SNAPSHOT</version>
+    <version>0.9.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 51173c32b2c152ddfb3166df80deccad86f31c1a..c4ce006085aeb1937bac2c0260b36fc0a90ffb81 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.8.0-SNAPSHOT</version>
+    <version>0.9.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/core/pom.xml b/core/pom.xml
index 14cd520aaf10eeb8d8908720e8ae63bac4b7fcdc..9c2d6046a99cd249e2f288dde0507db038f23f42 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.8.0-SNAPSHOT</version>
+    <version>0.9.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d9be6f71f22f941b045058812ebea4ec037aa94f..2fb4a530720c29d0f9eca5319a1522f3992aee07 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -56,9 +56,9 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
-  ClusterScheduler, Schedulable, SchedulingMode}
+  ClusterScheduler}
 import org.apache.spark.scheduler.local.LocalScheduler
-import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
@@ -256,7 +256,9 @@ class SparkContext(
   private[spark] var checkpointDir: Option[String] = None
 
   // Thread Local variable that can be used by users to pass information down the stack
-  private val localProperties = new ThreadLocal[Properties]
+  private val localProperties = new InheritableThreadLocal[Properties] {
+    override protected def childValue(parent: Properties): Properties = new Properties(parent)
+  }
 
   def initLocalProperties() {
     localProperties.set(new Properties())
@@ -273,6 +275,9 @@ class SparkContext(
     }
   }
 
+  def getLocalProperty(key: String): String =
+    Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
+
   /** Set a human readable description of the current job. */
   def setJobDescription(value: String) {
     setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 7e6e691f11c9d1d57d9db3e0eac37d64f3de71ce..7a3568c5ef3e1ebef65733bed713e350ea920da2 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -67,6 +67,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def map[R](f: JFunction[T, R]): JavaRDD[R] =
     new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
 
+  /**
+   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
+   * of the original partition.
+   */
+  def mapPartitionsWithIndex[R: ClassManifest](
+      f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]],
+      preservesPartitioning: Boolean = false): JavaRDD[R] =
+    new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
+					   preservesPartitioning))
+
   /**
    * Return a new RDD by applying a function to all elements of this RDD.
    */
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 1082cbae3e5193a36ed42d43a60d57c5f1f40e57..1893627ee2ce3360d0b5dfd78f00c9af2e38e54d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -753,24 +753,42 @@ abstract class RDD[T: ClassManifest](
   }
 
   /**
-   * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
-   * it will be slow if a lot of partitions are required. In that case, use collect() to get the
-   * whole RDD instead.
+   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
+   * results from that partition to estimate the number of additional partitions needed to satisfy
+   * the limit.
    */
   def take(num: Int): Array[T] = {
     if (num == 0) {
       return new Array[T](0)
     }
+
     val buf = new ArrayBuffer[T]
-    var p = 0
-    while (buf.size < num && p < partitions.size) {
+    val totalParts = this.partitions.length
+    var partsScanned = 0
+    while (buf.size < num && partsScanned < totalParts) {
+      // The number of partitions to try in this iteration. It is ok for this number to be
+      // greater than totalParts because we actually cap it at totalParts in runJob.
+      var numPartsToTry = 1
+      if (partsScanned > 0) {
+        // If we didn't find any rows after the first iteration, just try all partitions next.
+        // Otherwise, interpolate the number of partitions we need to try, but overestimate it
+        // by 50%.
+        if (buf.size == 0) {
+          numPartsToTry = totalParts - 1
+        } else {
+          numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
+        }
+      }
+      numPartsToTry = math.max(0, numPartsToTry)  // guard against negative num of partitions
+
       val left = num - buf.size
-      val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
-      buf ++= res(0)
-      if (buf.size == num)
-        return buf.toArray
-      p += 1
+      val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
+      val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
+
+      res.foreach(buf ++= _.take(num - buf.size))
+      partsScanned += numPartsToTry
     }
+
     return buf.toArray
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index db998e499ae4c876d77b715ffe76484559db97fa..4053b911345645518d9c8a155cb3a1f88eca3254 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -28,7 +28,6 @@ import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
-import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
 import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 0d996706489f0c3c50f04a1108d6a2a541c5f2ed..10ff1b4376268dbc96a8b1115df36eea5337cb0b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -19,7 +19,6 @@ package org.apache.spark.scheduler
 
 import java.util.Properties
 
-import org.apache.spark.scheduler.cluster.TaskInfo
 import scala.collection.mutable.Map
 
 import org.apache.spark._
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index c8b78bf00a83c0ffeded1835c080ecaa14b3a97e..3628b1b078de8b0f21ce95d14f2fe6bade88b0fa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -30,7 +30,6 @@ import scala.io.Source
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.cluster.TaskInfo
 
 // Used to record runtime information for each job, including RDD graph 
 // tasks' start/stop shuffle information and information from outside
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
similarity index 96%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala
rename to core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 199a0521ff7f542d774fa62729c2bd1afe5d859c..9eb8d4850169677492bd37d00cc1ab1057822ae3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.Logging
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 
 /**
  * An Schedulable entity that represent collection of Pools or TaskSetManagers
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
similarity index 93%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala
rename to core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index 171549fbd9a5a1d935d43b7df1d132efe83b6e73..1c7ea2dccc7d9b60b735fa37c4cc604db8fe6158 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 
 import scala.collection.mutable.ArrayBuffer
 /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
similarity index 64%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
rename to core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index f80823317bde7adac3af0b450ce7ba5c9af41080..4e25086ec91e03e7fa0f6e25d4c5e9e55fecca23 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -15,16 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
-import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
-import java.util.Properties
-
-import scala.xml.XML
+import java.io.{FileInputStream, InputStream}
+import java.util.{NoSuchElementException, Properties}
 
 import org.apache.spark.Logging
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
 
+import scala.xml.XML
 
 /**
  * An interface to build Schedulable tree
@@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
 private[spark] class FairSchedulableBuilder(val rootPool: Pool)
   extends SchedulableBuilder with Logging {
 
-  val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
+  val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+  val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
   val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
   val DEFAULT_POOL_NAME = "default"
   val MINIMUM_SHARES_PROPERTY = "minShare"
@@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
   val DEFAULT_WEIGHT = 1
 
   override def buildPools() {
-    if (schedulerAllocFile != null) {
-    val file = new File(schedulerAllocFile)
-      if (file.exists()) {
-        val xml = XML.loadFile(file)
-        for (poolNode <- (xml \\ POOLS_PROPERTY)) {
-
-          val poolName = (poolNode \ POOL_NAME_PROPERTY).text
-          var schedulingMode = DEFAULT_SCHEDULING_MODE
-          var minShare = DEFAULT_MINIMUM_SHARE
-          var weight = DEFAULT_WEIGHT
-
-          val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
-          if (xmlSchedulingMode != "") {
-            try {
-              schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
-            } catch {
-              case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
-            }
-          }
-
-          val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
-          if (xmlMinShare != "") {
-            minShare = xmlMinShare.toInt
-          }
-
-          val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
-          if (xmlWeight != "") {
-            weight = xmlWeight.toInt
-          }
-
-          val pool = new Pool(poolName, schedulingMode, minShare, weight)
-          rootPool.addSchedulable(pool)
-          logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
-            poolName, schedulingMode, minShare, weight))
+    var is: Option[InputStream] = None
+    try {
+      is = Option {
+        schedulerAllocFile.map { f =>
+          new FileInputStream(f)
+        }.getOrElse {
+          getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
         }
-      } else {
-        throw new java.io.FileNotFoundException(
-          "Fair scheduler allocation file not found: " + schedulerAllocFile)
       }
+
+      is.foreach { i => buildFairSchedulerPool(i) }
+    } finally {
+      is.foreach(_.close())
     }
 
     // finally create "default" pool
+    buildDefaultPool()
+  }
+
+  private def buildDefaultPool() {
     if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
       val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
         DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
@@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
     }
   }
 
+  private def buildFairSchedulerPool(is: InputStream) {
+    val xml = XML.load(is)
+    for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+
+      val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+      var schedulingMode = DEFAULT_SCHEDULING_MODE
+      var minShare = DEFAULT_MINIMUM_SHARE
+      var weight = DEFAULT_WEIGHT
+
+      val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+      if (xmlSchedulingMode != "") {
+        try {
+          schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+        } catch {
+          case e: NoSuchElementException =>
+            logWarning("Error xml schedulingMode, using default schedulingMode")
+        }
+      }
+
+      val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+      if (xmlMinShare != "") {
+        minShare = xmlMinShare.toInt
+      }
+
+      val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+      if (xmlWeight != "") {
+        weight = xmlWeight.toInt
+      }
+
+      val pool = new Pool(poolName, schedulingMode, minShare, weight)
+      rootPool.addSchedulable(pool)
+      logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+        poolName, schedulingMode, minShare, weight))
+    }
+  }
+
   override def addTaskSetManager(manager: Schedulable, properties: Properties) {
     var poolName = DEFAULT_POOL_NAME
     var parentPool = rootPool.getSchedulableByName(poolName)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
similarity index 98%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala
rename to core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
index cbeed4731a79c3205890beaae31b322085a57369..3418640b8c59ec00c16af51ab876b08808a5c784 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 /**
  * An interface for sort algorithm
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
similarity index 96%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala
rename to core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
index 34811389a02f7d27c6203a1ad6c77cbd90a03a6a..0a786deb161aed35b34fbf4f0d6d3d4864cddd19 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 /**
  *  "FAIR" and "FIFO" determines which policy is used
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index c3cf4b8907ebba878fcfb65d894ab97da0a06af8..62b521ad45d39c6854f2eae64c4971e02f9de350 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
-import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.util.{Utils, Distribution}
 import org.apache.spark.{Logging, SparkContext, TaskEndReason}
 import org.apache.spark.executor.TaskMetrics
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 72cb1c9ce8a2fac9e184db8a30d8cb69d976fc76..b6f11969e575efbe889d13cf8ef2dcc7eaccfe0a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.scheduler.cluster.TaskInfo
 import scala.collection._
+
 import org.apache.spark.executor.TaskMetrics
 
 case class StageInfo(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
similarity index 97%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala
rename to core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 309ac2f6c9d0537bd178570debf2b7c315be59de..5190d234d4ee531926086c8ac3673d61806309c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 import org.apache.spark.util.SerializableBuffer
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
similarity index 97%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala
rename to core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 9685fb1a67b7ade2637ef78021077084eae50c9e..7c2a422affbbfbe4be65817121d22c06f1bb3dfd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import org.apache.spark.util.Utils
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
similarity index 96%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala
rename to core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
index 5d4130e14a8ad1ab40f87111e56596c7ff217f0d..47b0f387aa0c5d63b05e25acdee9802dd1c9782f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 
 private[spark] object TaskLocality
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 63be8ba3f58ee584d7626c7113d3162ff61b251b..7c2a9f03d7ad894fcda8050da685c816738285a3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.scheduler.cluster.Pool
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+
 /**
  * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
+ * Each TaskScheduler schedulers task for a single SparkContext.
  * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
  * and are responsible for sending the tasks to the cluster, running them, retrying if there
  * are failures, and mitigating stragglers. They return events to the DAGScheduler through
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
index 83be051c1a8092029f6bf13b2408ae77fb9fa75f..593fa9fb93a55624573750e611c265c5de18d8a0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.scheduler.cluster.TaskInfo
 import scala.collection.mutable.Map
 
 import org.apache.spark.TaskEndReason
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
similarity index 95%
rename from core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala
rename to core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a0f3758a242909ee3562683305f21d7d020f3ccf..90f6bcefac0bfab0e29196c963f3d9ef3ac29fa2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.TaskSet
 
 /**
  * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 299c0e1888908fd13198cede0dd5542a28b4d077..1a844b7e7ee4b1d356dfd515dec31726bc7d86ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -29,8 +29,7 @@ import scala.collection.mutable.HashSet
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
-
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 
 /**
  * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index fc4da5df54aeea421cc991c637c7d0d01af15796..c7225de8705fba243423174a5735ff4ce20cedad 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -25,12 +25,12 @@ import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
 import scala.math.max
 import scala.math.min
+import scala.Some
 
-import org.apache.spark._
+import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv,
+  SparkException, Success, TaskEndReason, TaskResultTooBigFailure, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler._
-import org.apache.spark.FetchFailed
-import org.apache.spark.ExceptionFailure
 import org.apache.spark.util.{SystemClock, Clock}
 
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
index 9c36d221f69deeafc6c1786a972f879ac6c564a3..c0b836bf1a021737174323b65f24ff71048541bc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
 import java.nio.ByteBuffer
 
 import org.apache.spark.TaskState.TaskState
+import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.util.{Utils, SerializableBuffer}
 
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index b4ea0be415e58ac94c4a7b2419eaca99bc45c147..f3aeea43d575956aba4d6da26745c3bd469122fd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -29,6 +29,7 @@ import akka.util.Duration
 import akka.util.duration._
 
 import org.apache.spark.{SparkException, Logging, TaskState}
+import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
 import org.apache.spark.util.Utils
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
similarity index 98%
rename from core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
rename to core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 3dbe61d7067f2fb808b22082efd21a6832410501..8f2eef9a535fc062c68ab664721750317dad9ddc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -15,22 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.mesos
+package org.apache.spark.scheduler.cluster.mesos
 
-import com.google.protobuf.ByteString
+import java.io.File
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
 
+import com.google.protobuf.ByteString
 import org.apache.mesos.{Scheduler => MScheduler}
 import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import org.apache.spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import org.apache.spark.TaskState
+import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
similarity index 97%
rename from core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
rename to core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 541f86e3381db89a169b1ffb2e69e0d98cecd20f..50cbc2ca92e7e5435cf86673827af165dd29f24f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -15,22 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.spark.scheduler.mesos
+package org.apache.spark.scheduler.cluster.mesos
 
-import com.google.protobuf.ByteString
+import java.io.File
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
 
+import com.google.protobuf.ByteString
 import org.apache.mesos.{Scheduler => MScheduler}
 import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import org.apache.spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import org.apache.spark.TaskState
+import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
+import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason}
+import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer}
 import org.apache.spark.util.Utils
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index bcf9e1baf24ab87c25fc3b662da800182187edb0..4d1bb1c63973c26069809f0ca7ea044eb673c4f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -31,8 +31,7 @@ import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.ExecutorURLClassLoader
 import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster._
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import akka.actor._
 import org.apache.spark.util.Utils
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
index de0fd5a5285cd960e51bf93f753caa7a03be5354..dc4cf555deeea3461aa7fd3aa962b65784ebb536 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -23,9 +23,8 @@ import scala.collection.mutable.HashMap
 
 import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success, TaskState}
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskResult, TaskSet}
-import org.apache.spark.scheduler.cluster.{Pool, Schedulable, TaskDescription, TaskInfo}
-import org.apache.spark.scheduler.cluster.{TaskLocality, TaskSetManager}
+import org.apache.spark.scheduler.{Schedulable, Task, TaskDescription, TaskInfo, TaskLocality,
+  TaskResult, TaskSet, TaskSetManager}
 
 
 private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 3b3b2342fa213e8db7f5b1a18ee692016b241148..77a39c71ed2c1d22a18a3af76c4cf5b7ef96ca5b 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -30,10 +30,10 @@ import org.apache.spark.util.{SizeEstimator, Utils}
 private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   extends BlockStore(blockManager) {
 
-  case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
+  case class Entry(value: Any, size: Long, deserialized: Boolean)
 
   private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
-  private var currentMemory = 0L
+  @volatile private var currentMemory = 0L
   // Object used to ensure that only one thread is putting blocks and if necessary, dropping
   // blocks from the memory store.
   private val putLock = new Object()
@@ -110,9 +110,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
 
   override def remove(blockId: String): Boolean = {
     entries.synchronized {
-      val entry = entries.get(blockId)
+      val entry = entries.remove(blockId)
       if (entry != null) {
-        entries.remove(blockId)
         currentMemory -= entry.size
         logInfo("Block %s of size %d dropped from memory (free %d)".format(
           blockId, entry.size, freeMemory))
@@ -126,6 +125,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   override def clear() {
     entries.synchronized {
       entries.clear()
+      currentMemory = 0
     }
     logInfo("MemoryStore cleared")
   }
@@ -160,8 +160,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     putLock.synchronized {
       if (ensureFreeSpace(blockId, size)) {
         val entry = new Entry(value, size, deserialized)
-        entries.synchronized { entries.put(blockId, entry) }
-        currentMemory += size
+        entries.synchronized {
+          entries.put(blockId, entry)
+          currentMemory += size
+        }
         if (deserialized) {
           logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
             blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 3ec9760ed0306d0f14116471c1379ed9f381edce..453394dfda1ba0a5f317d81896fd2663fffa9e59 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -21,7 +21,7 @@ import scala.util.Random
 
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
-import org.apache.spark.scheduler.cluster.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
 
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index d1868dcf786284107c10e2e6eca1504875aba37e..42e9be6e19254a77715d269dae4a584ba0691b26 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -26,8 +26,8 @@ import org.eclipse.jetty.server.Handler
 
 import org.apache.spark.{ExceptionFailure, Logging, SparkContext}
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
+import org.apache.spark.scheduler.TaskInfo
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.ui.Page.Executors
 import org.apache.spark.ui.UIUtils
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 3b428effafad5c650708672cf9a5df23e3f3d1ed..b39c0e9769d48ee27b7d3c96277fad601f1d0c73 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.{NodeSeq, Node}
 
-import org.apache.spark.scheduler.cluster.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
 import org.apache.spark.ui.Page._
 import org.apache.spark.ui.UIUtils._
 
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 5d46f38a2a4b972f24919975c753309eb3967f33..eb3b4e8522804492e6f9ce0412f48d3b3f42c8ab 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -21,10 +21,8 @@ import scala.Seq
 import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
 
 import org.apache.spark.{ExceptionFailure, SparkContext, Success}
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.executor.TaskMetrics
-import collection.mutable
+import org.apache.spark.scheduler._
 
 /**
  * Tracks task-level information to be displayed in the UI.
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index 6aecef5120451a58909c64e9bd5f558332316b00..e7eab374ad7e0f09288d066c1100fadaf628d3cb 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -32,8 +32,8 @@ import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.{ExceptionFailure, SparkContext, Success}
 import org.apache.spark.scheduler._
 import collection.mutable
-import org.apache.spark.scheduler.cluster.SchedulingMode
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.util.Utils
 
 /** Web UI showing progress status of all jobs in the given SparkContext. */
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index b3d3666944b4c7de1ff4ae84dc008f8c354a4cb2..06810d8dbc2926f8ededb16d58ca4767804ed41d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
 import scala.xml.Node
 
-import org.apache.spark.scheduler.Stage
-import org.apache.spark.scheduler.cluster.Schedulable
+import org.apache.spark.scheduler.{Schedulable, Stage}
 import org.apache.spark.ui.UIUtils
 
 /** Table showing list of pools */
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index a9969ab1c0870bc52d86bcf78ab4dc7699a4709d..163a3746ea00ef5d92ce6345f11c66bb1999e456 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,12 +23,12 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
+import org.apache.spark.{ExceptionFailure}
+import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.ui.UIUtils._
 import org.apache.spark.ui.Page._
 import org.apache.spark.util.{Utils, Distribution}
-import org.apache.spark.{ExceptionFailure}
-import org.apache.spark.scheduler.cluster.TaskInfo
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.TaskInfo
 
 /** Page showing statistics and task list for a given stage */
 private[spark] class StagePage(parent: JobProgressUI) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 32776eaa25ae3106c75345828f2b5b602a27be11..07db8622da4718be7a73bd56d708eed4d2bd6036 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -22,8 +22,7 @@ import java.util.Date
 import scala.xml.Node
 import scala.collection.mutable.HashSet
 
-import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
-import org.apache.spark.scheduler.Stage
+import org.apache.spark.scheduler.{SchedulingMode, Stage, TaskInfo}
 import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.Utils
 
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 6ec124da9c7b17a933717e0f195119fc558b9f5f..459e257d79a36a4a12d3e5c1f0de6490e09106ca 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -40,17 +40,17 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
   }
 
   def resetSparkContext() = {
-    if (sc != null) {
-      LocalSparkContext.stop(sc)
-      sc = null
-    }
+    LocalSparkContext.stop(sc)
+    sc = null
   }
 
 }
 
 object LocalSparkContext {
   def stop(sc: SparkContext) {
-    sc.stop()
+    if (sc != null) {
+      sc.stop()
+    }
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
     System.clearProperty("spark.hostPort")
diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
index 97cbca09bfa2687939894b8cf5281b4cbbfa6c95..288aa14eeb03b2bcc7be42bd751716260bb90a11 100644
--- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
@@ -33,10 +33,8 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
   }
 
   override def afterAll() {
-    if (_sc != null) {
-      LocalSparkContext.stop(_sc)
-      _sc = null
-    }
+    LocalSparkContext.stop(_sc)
+    _sc = null
     super.afterAll()
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index 69383ddfb8bc7471653ad5d9234fa45f9bbb9737..75d6493e338fe71683316538a32ca679357a447d 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -40,7 +40,7 @@ object ThreadingSuiteState {
 }
 
 class ThreadingSuite extends FunSuite with LocalSparkContext {
-  
+
   test("accessing SparkContext form a different thread") {
     sc = new SparkContext("local", "test")
     val nums = sc.parallelize(1 to 10, 2)
@@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
       fail("One or more threads didn't see runningThreads = 4")
     }
   }
+
+  test("set local properties in different thread") {
+    sc = new SparkContext("local", "test")
+    val sem = new Semaphore(0)
+
+    val threads = (1 to 5).map { i =>
+      new Thread() {
+        override def run() {
+          sc.setLocalProperty("test", i.toString)
+          assert(sc.getLocalProperty("test") === i.toString)
+          sem.release()
+        }
+      }
+    }
+
+    threads.foreach(_.start())
+
+    sem.acquire(5)
+    assert(sc.getLocalProperty("test") === null)
+  }
+
+  test("set and get local properties in parent-children thread") {
+    sc = new SparkContext("local", "test")
+    sc.setLocalProperty("test", "parent")
+    val sem = new Semaphore(0)
+
+    val threads = (1 to 5).map { i =>
+      new Thread() {
+        override def run() {
+          assert(sc.getLocalProperty("test") === "parent")
+          sc.setLocalProperty("test", i.toString)
+          assert(sc.getLocalProperty("test") === i.toString)
+          sem.release()
+        }
+      }
+    }
+
+    threads.foreach(_.start())
+
+    sem.acquire(5)
+    assert(sc.getLocalProperty("test") === "parent")
+    assert(sc.getLocalProperty("Foo") === null)
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index c1df5e151e8acca7073a50209cfa4b5460795262..6d1bc5e296e06beb137673088229da3750c0579c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -25,7 +25,6 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.rdd._
 import scala.collection.parallel.mutable
 import org.apache.spark._
-import org.apache.spark.rdd.CoalescedRDDPartition
 
 class RDDSuite extends FunSuite with SharedSparkContext {
 
@@ -321,6 +320,44 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
   }
 
+  test("take") {
+    var nums = sc.makeRDD(Range(1, 1000), 1)
+    assert(nums.take(0).size === 0)
+    assert(nums.take(1) === Array(1))
+    assert(nums.take(3) === Array(1, 2, 3))
+    assert(nums.take(500) === (1 to 500).toArray)
+    assert(nums.take(501) === (1 to 501).toArray)
+    assert(nums.take(999) === (1 to 999).toArray)
+    assert(nums.take(1000) === (1 to 999).toArray)
+
+    nums = sc.makeRDD(Range(1, 1000), 2)
+    assert(nums.take(0).size === 0)
+    assert(nums.take(1) === Array(1))
+    assert(nums.take(3) === Array(1, 2, 3))
+    assert(nums.take(500) === (1 to 500).toArray)
+    assert(nums.take(501) === (1 to 501).toArray)
+    assert(nums.take(999) === (1 to 999).toArray)
+    assert(nums.take(1000) === (1 to 999).toArray)
+
+    nums = sc.makeRDD(Range(1, 1000), 100)
+    assert(nums.take(0).size === 0)
+    assert(nums.take(1) === Array(1))
+    assert(nums.take(3) === Array(1, 2, 3))
+    assert(nums.take(500) === (1 to 500).toArray)
+    assert(nums.take(501) === (1 to 501).toArray)
+    assert(nums.take(999) === (1 to 999).toArray)
+    assert(nums.take(1000) === (1 to 999).toArray)
+
+    nums = sc.makeRDD(Range(1, 1000), 1000)
+    assert(nums.take(0).size === 0)
+    assert(nums.take(1) === Array(1))
+    assert(nums.take(3) === Array(1, 2, 3))
+    assert(nums.take(500) === (1 to 500).toArray)
+    assert(nums.take(501) === (1 to 501).toArray)
+    assert(nums.take(999) === (1 to 999).toArray)
+    assert(nums.take(1000) === (1 to 999).toArray)
+  }
+
   test("top with predefined ordering") {
     val nums = Array.range(1, 100000)
     val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 94f66c94c685a0bcbc057eb831adacfbc5f7c6f7..9ed591e494a1648dcfec3faaaf5dbd59a9612728 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -32,9 +32,9 @@ import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency}
 import org.apache.spark.{FetchFailed, Success, TaskEndReason}
 import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster}
 
-import org.apache.spark.scheduler.cluster.Pool
-import org.apache.spark.scheduler.cluster.SchedulingMode
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.Pool
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 
 /**
  * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
diff --git a/docs/_config.yml b/docs/_config.yml
index b061764b36817cf2d6fde9ede6ffc897e6fab0c3..48ecb8d0c9bf28660ea2df2bd1e58a527f5f8c7e 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -3,8 +3,8 @@ markdown: kramdown
 
 # These allow the documentation to be updated with nerw releases
 # of Spark, Scala, and Mesos.
-SPARK_VERSION: 0.8.0-SNAPSHOT
-SPARK_VERSION_SHORT: 0.8.0
+SPARK_VERSION: 0.9.0-incubating-SNAPSHOT
+SPARK_VERSION_SHORT: 0.9.0-SNAPSHOT
 SCALA_VERSION: 2.9.3
 MESOS_VERSION: 0.13.0
 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 6b7d202a88174e8b9046463e3e75324ba43b99d8..1190ed47f6bcc4502fa597c704d2c2c870d1b319 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -23,6 +23,7 @@ from __future__ import with_statement
 
 import logging
 import os
+import pipes
 import random
 import shutil
 import subprocess
@@ -36,6 +37,9 @@ import boto
 from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
 from boto import ec2
 
+class UsageError(Exception):
+  pass
+
 # A URL prefix from which to fetch AMI information
 AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
 
@@ -103,11 +107,7 @@ def parse_args():
     parser.print_help()
     sys.exit(1)
   (action, cluster_name) = args
-  if opts.identity_file == None and action in ['launch', 'login', 'start']:
-    print >> stderr, ("ERROR: The -i or --identity-file argument is " +
-                      "required for " + action)
-    sys.exit(1)
-  
+
   # Boto config check
   # http://boto.cloudhackers.com/en/latest/boto_config_tut.html
   home_dir = os.getenv('HOME')
@@ -390,10 +390,18 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
 def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
   master = master_nodes[0].public_dns_name
   if deploy_ssh_key:
-    print "Copying SSH key %s to master..." % opts.identity_file
-    ssh(master, opts, 'mkdir -p ~/.ssh')
-    scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
-    ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa')
+    print "Generating cluster's SSH key on master..."
+    key_setup = """
+      [ -f ~/.ssh/id_rsa ] ||
+        (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&
+         cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)
+    """
+    ssh(master, opts, key_setup)
+    dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
+    print "Transferring cluster's SSH key to slaves..."
+    for slave in slave_nodes:
+      print slave.public_dns_name
+      ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)
 
   modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs', 
              'mapreduce', 'spark-standalone']
@@ -535,18 +543,33 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
               dest.write(text)
               dest.close()
   # rsync the whole directory over to the master machine
-  command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
-      "'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master))
-  subprocess.check_call(command, shell=True)
+  command = [
+      'rsync', '-rv',
+      '-e', stringify_command(ssh_command(opts)),
+      "%s/" % tmp_dir,
+      "%s@%s:/" % (opts.user, active_master)
+    ]
+  subprocess.check_call(command)
   # Remove the temp directory we created above
   shutil.rmtree(tmp_dir)
 
 
-# Copy a file to a given host through scp, throwing an exception if scp fails
-def scp(host, opts, local_file, dest_file):
-  subprocess.check_call(
-      "scp -q -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" %
-      (opts.identity_file, local_file, opts.user, host, dest_file), shell=True)
+def stringify_command(parts):
+  if isinstance(parts, str):
+    return parts
+  else:
+    return ' '.join(map(pipes.quote, parts))
+
+
+def ssh_args(opts):
+  parts = ['-o', 'StrictHostKeyChecking=no']
+  if opts.identity_file is not None:
+    parts += ['-i', opts.identity_file]
+  return parts
+
+
+def ssh_command(opts):
+  return ['ssh'] + ssh_args(opts)
 
 
 # Run a command on a host through ssh, retrying up to two times
@@ -556,18 +579,42 @@ def ssh(host, opts, command):
   while True:
     try:
       return subprocess.check_call(
-        "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
-        (opts.identity_file, opts.user, host, command), shell=True)
+        ssh_command(opts) + ['-t', '%s@%s' % (opts.user, host), stringify_command(command)])
     except subprocess.CalledProcessError as e:
       if (tries > 2):
-        raise e
-      print "Couldn't connect to host {0}, waiting 30 seconds".format(e)
+        # If this was an ssh failure, provide the user with hints.
+        if e.returncode == 255:
+          raise UsageError("Failed to SSH to remote host {0}.\nPlease check that you have provided the correct --identity-file and --key-pair parameters and try again.".format(host))
+        else:
+          raise e
+      print >> stderr, "Error executing remote command, retrying after 30 seconds: {0}".format(e)
       time.sleep(30)
       tries = tries + 1
 
 
+def ssh_read(host, opts, command):
+  return subprocess.check_output(
+      ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)])
 
 
+def ssh_write(host, opts, command, input):
+  tries = 0
+  while True:
+    proc = subprocess.Popen(
+        ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)],
+        stdin=subprocess.PIPE)
+    proc.stdin.write(input)
+    proc.stdin.close()
+    status = proc.wait()
+    if status == 0:
+      break
+    elif (tries > 2):
+      raise RuntimeError("ssh_write failed with error %s" % proc.returncode)
+    else:
+      print >> stderr, "Error {0} while executing remote command, retrying after 30 seconds".format(status)
+      time.sleep(30)
+      tries = tries + 1
+
 
 # Gets a list of zones to launch instances in
 def get_zones(conn, opts):
@@ -586,7 +633,7 @@ def get_partition(total, num_partitions, current_partitions):
   return num_slaves_this_zone
 
 
-def main():
+def real_main():
   (opts, action, cluster_name) = parse_args()
   try:
     conn = ec2.connect_to_region(opts.region)
@@ -669,11 +716,11 @@ def main():
         conn, opts, cluster_name)
     master = master_nodes[0].public_dns_name
     print "Logging into master " + master + "..."
-    proxy_opt = ""
+    proxy_opt = []
     if opts.proxy_port != None:
-      proxy_opt = "-D " + opts.proxy_port
-    subprocess.check_call("ssh -o StrictHostKeyChecking=no -i %s %s %s@%s" %
-        (opts.identity_file, proxy_opt, opts.user, master), shell=True)
+      proxy_opt = ['-D', opts.proxy_port]
+    subprocess.check_call(
+        ssh_command(opts) + proxy_opt + ['-t', "%s@%s" % (opts.user, master)])
 
   elif action == "get-master":
     (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
@@ -715,6 +762,13 @@ def main():
     sys.exit(1)
 
 
+def main():
+  try:
+    real_main()
+  except UsageError, e:
+    print >> stderr, "\nError:\n", e
+
+
 if __name__ == "__main__":
   logging.basicConfig()
   main()
diff --git a/examples/pom.xml b/examples/pom.xml
index e48f5b50abcc6f7f88bf6bdb40e42be822767539..b9cc6f5e0aa45ba52c2c385cd649737f760f401f 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.8.0-SNAPSHOT</version>
+    <version>0.9.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/mllib/pom.xml b/mllib/pom.xml
index 966caf6835561f0a876970aeaf2118c6568d16e4..4ef4f0ae4e0770013ec49b2c18c4d3d311b6dd56 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.8.0-SNAPSHOT</version>
+    <version>0.9.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/pom.xml b/pom.xml
index 4aed1260f06c8a56a01435d2a8c85523f4d1c1a6..ad5051d38a19ddfb456f6c359a1ab2716ad5c103 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
   </parent>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-parent</artifactId>
-  <version>0.8.0-SNAPSHOT</version>
+  <version>0.9.0-incubating-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>Spark Project Parent POM</name>
   <url>http://spark.incubator.apache.org/</url>
@@ -557,7 +557,6 @@
             <useZincServer>true</useZincServer>
             <args>
               <arg>-unchecked</arg>
-              <arg>-optimise</arg>
               <arg>-deprecation</arg>
             </args>
             <jvmArgs>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index ed7671757bfa102f944a5085f149041a25e3754e..aef246d8a9088382f21d5b6a8edcf273337a08ff 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -79,9 +79,9 @@ object SparkBuild extends Build {
 
   def sharedSettings = Defaults.defaultSettings ++ Seq(
     organization := "org.apache.spark",
-    version := "0.8.0-SNAPSHOT",
+    version := "0.9.0-incubating-SNAPSHOT",
     scalaVersion := "2.9.3",
-    scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", 
+    scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-deprecation",
       "-target:" + SCALAC_JVM_VERSION),
     javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
     unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index dc205b306f0a93d398d4b18c95da6f888a93ec6e..a47595909029754590cf2a4fac27e4d3b0d6b17a 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -35,7 +35,7 @@ print """Welcome to
       ____              __
      / __/__  ___ _____/ /__
     _\ \/ _ \/ _ `/ __/  '_/
-   /__ / .__/\_,_/_/ /_/\_\   version 0.8.0
+   /__ / .__/\_,_/_/ /_/\_\   version 0.9.0-SNAPSHOT
       /_/
 """
 print "Using Python version %s (%s, %s)" % (
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index 3685561501d67c8c47f0109a1a66588ec8e3acf5..05aadc7bdf2f3ea984ab73297ae5ccd9db515600 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.8.0-SNAPSHOT</version>
+    <version>0.9.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/repl/pom.xml b/repl/pom.xml
index 3123b37780dc509c8e302b7fbe41f79610127ad6..2826c0743c75c5c9be5126159e928d0dc599407b 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.8.0-SNAPSHOT</version>
+    <version>0.9.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 193ccb48eeb98827bb8d4f6f6df62209f7c5010a..36f54a22cf30449d421364ab74d8dcb2db459783 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
       ____              __  
      / __/__  ___ _____/ /__
     _\ \/ _ \/ _ `/ __/  '_/
-   /___/ .__/\_,_/_/ /_/\_\   version 0.8.0
+   /___/ .__/\_,_/_/ /_/\_\   version 0.9.0-SNAPSHOT
       /_/                  
 """)
     import Properties._
diff --git a/spark-class b/spark-class
index 037abda3b710ab260d385103b7579befc1be1164..e111ef6da7e7c40cbac450d62d4ab3cfcf6b960d 100755
--- a/spark-class
+++ b/spark-class
@@ -37,7 +37,7 @@ fi
 
 # If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable
 # values for that; it doesn't need a lot
-if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
+if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then
   SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
   SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
   # Do not overwrite SPARK_JAVA_OPTS environment variable in this script
@@ -49,19 +49,19 @@ fi
 
 # Add java opts for master, worker, executor. The opts maybe null
 case "$1" in
-  'spark.deploy.master.Master')
+  'org.apache.spark.deploy.master.Master')
     OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS"
     ;;
-  'spark.deploy.worker.Worker')
+  'org.apache.spark.deploy.worker.Worker')
     OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS"
     ;;
-  'spark.executor.StandaloneExecutorBackend')
+  'org.apache.spark.executor.StandaloneExecutorBackend')
     OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
     ;;
-  'spark.executor.MesosExecutorBackend')
+  'org.apache.spark.executor.MesosExecutorBackend')
     OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
     ;;
-  'spark.repl.Main')
+  'org.apache.spark.repl.Main')
     OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS"
     ;;
 esac
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 7bea069b6158a2292c47bd6f6c91bead6e9fd3f9..b260a72abb24ee821b366acb17c51452ecfa5226 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.8.0-SNAPSHOT</version>
+    <version>0.9.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/tools/pom.xml b/tools/pom.xml
index 77646a68165adea8bb810259f91ff368367f652d..29f001412829323cd3efc1562d077957a5f89953 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.8.0-SNAPSHOT</version>
+    <version>0.9.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 21b650d1ea7ce5a21053515782a6f92d6f5cee9b..427fcdf545aaf371508d5ab61352e3f2eeb83cc2 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-parent</artifactId>
-    <version>0.8.0-SNAPSHOT</version>
+    <version>0.9.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 844c707834ecf0ba770b2b09bee6b1af9de6cefd..33620101067b832e9fde98e8694ad90e84b5357e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -224,8 +224,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     // Add Xmx for am memory
     JAVA_OPTS += "-Xmx" + amMemory + "m "
 
-    JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
-                                                 YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+    JAVA_OPTS += " -Djava.io.tmpdir=" + 
+      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
 
 
     // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
@@ -241,6 +241,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
       JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
     }
+
     if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
       JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
     }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 6229167cb44263bc16c1e951dddadbc550ba17f3..a60e8a300786e31df2d7408d34a833b1deb54f28 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -77,8 +77,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
       JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
     }
 
-    JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
-                                                 YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+    JAVA_OPTS += " -Djava.io.tmpdir=" + 
+      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+
 
     // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
     // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same