diff --git a/assembly/pom.xml b/assembly/pom.xml
index f9c8b9708a218217bc7b67dc6bcc6aa8656f987d..28b0692dff10350dab30a6863dee475e9fb5234f 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -26,7 +26,7 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>spark-assembly</artifactId>
+  <artifactId>spark-assembly_${scala-short.version}</artifactId>
   <name>Spark Project Assembly</name>
   <url>http://spark.incubator.apache.org/</url>
 
@@ -41,27 +41,27 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core</artifactId>
+      <artifactId>spark-core_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-bagel</artifactId>
+      <artifactId>spark-bagel_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-mllib</artifactId>
+      <artifactId>spark-mllib_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-repl</artifactId>
+      <artifactId>spark-repl_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming</artifactId>
+      <artifactId>spark-streaming_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
@@ -104,13 +104,13 @@
             </goals>
             <configuration>
               <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                   <resource>META-INF/services/org.apache.hadoop.fs.FileSystem</resource>
                 </transformer>
               </transformers>
               <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                   <resource>reference.conf</resource>
                 </transformer>
@@ -128,7 +128,7 @@
       <dependencies>
         <dependency>
           <groupId>org.apache.spark</groupId>
-          <artifactId>spark-yarn</artifactId>
+          <artifactId>spark-yarn_${scala-short.version}</artifactId>
           <version>${project.version}</version>
         </dependency>
       </dependencies>
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 271ab6ce56e8c3c950ea69a1178d689f5e6d7b43..c8b9c4f4cdd979c32724e642e75ad49515fe0927 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -26,7 +26,7 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>spark-bagel</artifactId>
+  <artifactId>spark-bagel_${scala-short.version}</artifactId>
   <packaging>jar</packaging>
   <name>Spark Project Bagel</name>
   <url>http://spark.incubator.apache.org/</url>
@@ -34,7 +34,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core</artifactId>
+      <artifactId>spark-core_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
diff --git a/core/pom.xml b/core/pom.xml
index 2ec8fa6d0afa2f314b3c6efd7a725600ebe2f4d7..595240b5e5fd143160a2ea81ac3a7c94719be252 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -26,7 +26,7 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>spark-core</artifactId>
+  <artifactId>spark-core_${scala-short.version}</artifactId>
   <packaging>jar</packaging>
   <name>Spark Project Core</name>
   <url>http://spark.incubator.apache.org/</url>
@@ -39,7 +39,6 @@
     <dependency>
       <groupId>net.java.dev.jets3t</groupId>
       <artifactId>jets3t</artifactId>
-      <version>0.7.1</version>
     </dependency>
     <dependency>
       <groupId>org.apache.avro</groupId>
@@ -199,14 +198,14 @@
             <configuration>
               <exportAntProperties>true</exportAntProperties>
               <tasks>
-                <property name="spark.classpath" refid="maven.test.classpath"/>
-                <property environment="env"/>
+                <property name="spark.classpath" refid="maven.test.classpath" />
+                <property environment="env" />
                 <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
                   <condition>
                     <not>
                       <or>
-                        <isset property="env.SCALA_HOME"/>
-                        <isset property="env.SCALA_LIBRARY_PATH"/>
+                        <isset property="env.SCALA_HOME" />
+                        <isset property="env.SCALA_LIBRARY_PATH" />
                       </or>
                     </not>
                   </condition>
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 68b99ca1253d5f3b64395f55bedc322c9dbb81b2..4cf7eb96da25ee283df9bc97a72a540542293cba 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -26,28 +26,29 @@ import org.apache.spark.rdd.RDD
     sure a node doesn't load two copies of an RDD at once.
   */
 private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
-  private val loading = new HashSet[String]
+
+  /** Keys of RDD splits that are being computed/loaded. */
+  private val loading = new HashSet[String]()
 
   /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
   def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
       : Iterator[T] = {
     val key = "rdd_%d_%d".format(rdd.id, split.index)
-    logInfo("Cache key is " + key)
+    logDebug("Looking for partition " + key)
     blockManager.get(key) match {
-      case Some(cachedValues) =>
-        // Partition is in cache, so just return its values
-        logInfo("Found partition in cache!")
-        return cachedValues.asInstanceOf[Iterator[T]]
+      case Some(values) =>
+        // Partition is already materialized, so just return its values
+        return values.asInstanceOf[Iterator[T]]
 
       case None =>
         // Mark the split as loading (unless someone else marks it first)
         loading.synchronized {
           if (loading.contains(key)) {
-            logInfo("Loading contains " + key + ", waiting...")
+            logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
             while (loading.contains(key)) {
               try {loading.wait()} catch {case _ : Throwable =>}
             }
-            logInfo("Loading no longer contains " + key + ", so returning cached result")
+            logInfo("Finished waiting for %s".format(key))
             // See whether someone else has successfully loaded it. The main way this would fail
             // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
             // partition but we didn't want to make space for it. However, that case is unlikely
@@ -57,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
               case Some(values) =>
                 return values.asInstanceOf[Iterator[T]]
               case None =>
-                logInfo("Whoever was loading " + key + " failed; we'll try it ourselves")
+                logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
                 loading.add(key)
             }
           } else {
@@ -66,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
         }
         try {
           // If we got here, we have to load the split
-          logInfo("Computing partition " + split)
+          logInfo("Partition %s not found, computing it".format(key))
           val computedValues = rdd.computeOrReadCheckpoint(split, context)
           // Persist the result, so long as the task is not running locally
           if (context.runningLocally) { return computedValues }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 17c6f9c955654502e847adfb7993440bdff03164..efcc92e8e7bb794f06cb514162b70c0ff29b17c5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
 
 import org.apache.mesos.MesosNativeLibrary
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.LocalSparkCluster
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
@@ -85,9 +86,11 @@ class SparkContext(
     val sparkHome: String = null,
     val jars: Seq[String] = Nil,
     val environment: Map[String, String] = Map(),
-    // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.
-    // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
+    // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc)
+    // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
+    // of data-local splits on host
+    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+      scala.collection.immutable.Map())
   extends Logging {
 
   // Ensure logging is initialized before we spawn any threads
@@ -240,7 +243,8 @@ class SparkContext(
     val env = SparkEnv.get
     val conf = env.hadoop.newConfiguration()
     // Explicitly check for S3 environment variables
-    if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+    if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+        System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
       conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
       conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
       conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
@@ -339,6 +343,8 @@ class SparkContext(
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
+    // Add necessary security credentials to the JobConf before broadcasting it.
+    SparkEnv.get.hadoop.addCredentials(conf)
     new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
@@ -349,10 +355,27 @@ class SparkContext(
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
-      ) : RDD[(K, V)] = {
-    val conf = new JobConf(hadoopConfiguration)
-    FileInputFormat.setInputPaths(conf, path)
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+      ): RDD[(K, V)] = {
+    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
+    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+    hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+  }
+
+  /**
+   * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
+   * that has already been broadcast, assuming that it's safe to use it to construct a
+   * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
+   */
+  def hadoopFile[K, V](
+      path: String,
+      confBroadcast: Broadcast[SerializableWritable[Configuration]],
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      minSplits: Int
+      ): RDD[(K, V)] = {
+    new HadoopFileRDD(
+      this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
index b090c6edf3c07adad0e796000cbc81194ebae6be..2be4e323bec9808338e8d48fd3463551ef504f69 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.api.python
 
-import org.apache.spark.Partitioner
 import java.util.Arrays
+
+import org.apache.spark.Partitioner
 import org.apache.spark.util.Utils
 
 /**
- * A [[org.apache.spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
+ * A [[org.apache.spark.Partitioner]] that performs handling of long-valued keys, for use by the Python API.
  *
  * Stores the unique id() of the Python-side partitioning function so that it is incorporated into
  * equality comparisons.  Correctness requires that the id is a unique identifier for the
@@ -30,6 +31,7 @@ import org.apache.spark.util.Utils
  * function).  This can be ensured by using the Python id() function and maintaining a reference
  * to the Python partitioning function so that its id() is not reused.
  */
+
 private[spark] class PythonPartitioner(
   override val numPartitions: Int,
   val pyPartitionFunctionId: Long)
@@ -37,7 +39,9 @@ private[spark] class PythonPartitioner(
 
   override def getPartition(key: Any): Int = key match {
     case null => 0
-    case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions)
+    // we don't trust the Python partition function to return valid partition ID's so
+    // let's do a modulo numPartitions in any case
+    case key: Long => Utils.nonNegativeMod(key.toInt, numPartitions)
     case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index cb2db77f39991f3d7cec2e6074c9ac4a47c967f5..4d887cf195265a5eeb7182d7b92880a82203925a 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -187,14 +187,14 @@ private class PythonException(msg: String) extends Exception(msg)
  * This is used by PySpark's shuffle operations.
  */
 private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
-  RDD[(Array[Byte], Array[Byte])](prev) {
+  RDD[(Long, Array[Byte])](prev) {
   override def getPartitions = prev.partitions
   override def compute(split: Partition, context: TaskContext) =
     prev.iterator(split, context).grouped(2).map {
-      case Seq(a, b) => (a, b)
+      case Seq(a, b) => (Utils.deserializeLongValue(a), b)
       case x          => throw new SparkException("PairwiseRDD: unexpected value: " + x)
     }
-  val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
+  val asJavaPairRDD : JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this)
 }
 
 private[spark] object PythonRDD {
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index 87a703427caf18bb8860026c3d4f5673d81527bd..04d01c169d3e5d36d303599bf5cb150235546549 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -41,6 +41,7 @@ private[spark] object JsonProtocol {
     ("starttime" -> obj.startTime) ~
     ("id" -> obj.id) ~
     ("name" -> obj.desc.name) ~
+    ("appuiurl" -> obj.appUiUrl) ~
     ("cores" -> obj.desc.maxCores) ~
     ("user" ->  obj.desc.user) ~
     ("memoryperslave" -> obj.desc.memoryPerSlave) ~
@@ -64,7 +65,7 @@ private[spark] object JsonProtocol {
   }
 
   def writeMasterState(obj: MasterStateResponse) = {
-    ("url" -> ("spark://" + obj.uri)) ~
+    ("url" -> obj.uri) ~
     ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
     ("cores" -> obj.workers.map(_.cores).sum) ~
     ("coresused" -> obj.workers.map(_.coresUsed).sum) ~
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 0a5f4c368f3d1ef12374f5842f4bd7b0d0354817..993ba6bd3dbfad6faeed86d8d4f4ef39535dd00e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -16,6 +16,9 @@
  */
 
 package org.apache.spark.deploy
+
+import com.google.common.collect.MapMaker
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 
@@ -24,11 +27,16 @@ import org.apache.hadoop.mapred.JobConf
  * Contains util methods to interact with Hadoop from spark.
  */
 class SparkHadoopUtil {
+  // A general, soft-reference map for metadata needed during HadoopRDD split computation
+  // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
+  private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
 
-  // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+  // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop
+  // subsystems
   def newConfiguration(): Configuration = new Configuration()
 
-  // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+  // Add any user credentials to the job conf which are necessary for running on a secure Hadoop
+  // cluster
   def addCredentials(conf: JobConf) {}
 
   def isYarnMode(): Boolean = { false }
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cb6734e4199c2f50dcef278dbb23691c4cd3621..d3b3fffd409d17dfda16aca529596b064b9b1058 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
 
 import java.io.EOFException
 
+import org.apache.hadoop.mapred.FileInputFormat
 import org.apache.hadoop.mapred.InputFormat
 import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf
@@ -26,10 +27,47 @@ import org.apache.hadoop.mapred.RecordReader
 import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.ReflectionUtils
 
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
+  TaskContext}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.util.NextIterator
 import org.apache.hadoop.conf.{Configuration, Configurable}
 
+/**
+ * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
+ * system, or S3).
+ * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
+ * across multiple reads; the 'path' is the only variable that is different across new JobConfs
+ * created from the Configuration.
+ */
+class HadoopFileRDD[K, V](
+    sc: SparkContext,
+    path: String,
+    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+    inputFormatClass: Class[_ <: InputFormat[K, V]],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int)
+  extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) {
+
+  override def getJobConf(): JobConf = {
+    if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      // getJobConf() has been called previously, so there is already a local cache of the JobConf
+      // needed by this RDD.
+      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+    } else {
+      // Create a new JobConf, set the input file/directory paths to read from, and cache the
+      // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through
+      // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple
+      // getJobConf() calls for this RDD in the local process.
+      // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+      val newJobConf = new JobConf(broadcastedConf.value.value)
+      FileInputFormat.setInputPaths(newJobConf, path)
+      HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+      return newJobConf
+    }
+  }
+}
 
 /**
  * A Spark split class that wraps around a Hadoop InputSplit.
@@ -45,29 +83,80 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
 }
 
 /**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
- * system, or S3, tables in HBase, etc).
+ * A base class that provides core functionality for reading data partitions stored in Hadoop.
  */
 class HadoopRDD[K, V](
     sc: SparkContext,
-    @transient conf: JobConf,
+    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
     minSplits: Int)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
-  // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
-  private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+  def this(
+      sc: SparkContext,
+      conf: JobConf,
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      minSplits: Int) = {
+    this(
+      sc,
+      sc.broadcast(new SerializableWritable(conf))
+        .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+      inputFormatClass,
+      keyClass,
+      valueClass,
+      minSplits)
+  }
+
+  protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+
+  protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+
+  // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
+  protected def getJobConf(): JobConf = {
+    val conf: Configuration = broadcastedConf.value.value
+    if (conf.isInstanceOf[JobConf]) {
+      // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
+      return conf.asInstanceOf[JobConf]
+    } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      // getJobConf() has been called previously, so there is already a local cache of the JobConf
+      // needed by this RDD.
+      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+    } else {
+      // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
+      // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+      // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+      val newJobConf = new JobConf(broadcastedConf.value.value)
+      HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+      return newJobConf
+    }
+  }
+
+  protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
+    if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
+      return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
+    }
+    // Once an InputFormat for this RDD is created, cache it so that only one reflection call is
+    // done in each local process.
+    val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+      .asInstanceOf[InputFormat[K, V]]
+    if (newInputFormat.isInstanceOf[Configurable]) {
+      newInputFormat.asInstanceOf[Configurable].setConf(conf)
+    }
+    HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
+    return newInputFormat
+  }
 
   override def getPartitions: Array[Partition] = {
-    val env = SparkEnv.get
-    env.hadoop.addCredentials(conf)
-    val inputFormat = createInputFormat(conf)
+    val jobConf = getJobConf()
+    val inputFormat = getInputFormat(jobConf)
     if (inputFormat.isInstanceOf[Configurable]) {
-      inputFormat.asInstanceOf[Configurable].setConf(conf)
+      inputFormat.asInstanceOf[Configurable].setConf(jobConf)
     }
-    val inputSplits = inputFormat.getSplits(conf, minSplits)
+    val inputSplits = inputFormat.getSplits(jobConf, minSplits)
     val array = new Array[Partition](inputSplits.size)
     for (i <- 0 until inputSplits.size) {
       array(i) = new HadoopPartition(id, i, inputSplits(i))
@@ -75,22 +164,14 @@ class HadoopRDD[K, V](
     array
   }
 
-  def createInputFormat(conf: JobConf): InputFormat[K, V] = {
-    ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
-      .asInstanceOf[InputFormat[K, V]]
-  }
-
   override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
     val split = theSplit.asInstanceOf[HadoopPartition]
     logInfo("Input split: " + split.inputSplit)
     var reader: RecordReader[K, V] = null
 
-    val conf = confBroadcast.value.value
-    val fmt = createInputFormat(conf)
-    if (fmt.isInstanceOf[Configurable]) {
-      fmt.asInstanceOf[Configurable].setConf(conf)
-    }
-    reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
+    val jobConf = getJobConf()
+    val inputFormat = getInputFormat(jobConf)
+    reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
 
     // Register an on-task-completion callback to close the input stream.
     context.addOnCompleteCallback{ () => closeIfNeeded() }
@@ -127,5 +208,18 @@ class HadoopRDD[K, V](
     // Do nothing. Hadoop RDD should not be checkpointed.
   }
 
-  def getConf: Configuration = confBroadcast.value.value
+  def getConf: Configuration = getJobConf()
+}
+
+private[spark] object HadoopRDD {
+  /**
+   * The three methods below are helpers for accessing the local map, a property of the SparkEnv of
+   * the local process.
+   */
+  def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+
+  def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+
+  def putCachedMetadata(key: String, value: Any) =
+    SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a5e792d8968813fb0a2144ca6ab28213785871ed..7852849ce55f09184b04c0f91226bdb94e58db7b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -523,7 +523,17 @@ private[spark] class BlockManager(
    * Get a block from the block manager (either local or remote).
    */
   def get(blockId: String): Option[Iterator[Any]] = {
-    getLocal(blockId).orElse(getRemote(blockId))
+    val local = getLocal(blockId)
+    if (local.isDefined) {
+      logInfo("Found block %s locally".format(blockId))
+      return local
+    }
+    val remote = getRemote(blockId)
+    if (remote.isDefined) {
+      logInfo("Found block %s remotely".format(blockId))
+      return remote
+    }
+    None
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index be215fc1272739a15bfbf4c69a4a9c3e0fe2fb21..94ce50e96463eed7cadc4edbff4d0e6c0910e8f0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -77,6 +77,19 @@ private[spark] object Utils extends Logging {
     return ois.readObject.asInstanceOf[T]
   }
 
+  /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */
+  def deserializeLongValue(bytes: Array[Byte]) : Long = {
+    // Note: we assume that we are given a Long value encoded in network (big-endian) byte order
+    var result = bytes(7) & 0xFFL
+    result = result + ((bytes(6) & 0xFFL) << 8)
+    result = result + ((bytes(5) & 0xFFL) << 16)
+    result = result + ((bytes(4) & 0xFFL) << 24)
+    result = result + ((bytes(3) & 0xFFL) << 32)
+    result = result + ((bytes(2) & 0xFFL) << 40)
+    result = result + ((bytes(1) & 0xFFL) << 48)
+    result + ((bytes(0) & 0xFFL) << 56)
+  }
+
   /** Serialize via nested stream using specific serializer */
   def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
     val osWrapper = ser.serializeStream(new OutputStream {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index aac7c207cbe0c8a6c3f3be75649815c9f57d058a..41a161e08ae8bc5874cdc2e89f8f99b00acd4254 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -29,7 +29,9 @@ import org.apache.spark.SparkContext._
 
 class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
 
-  test("local metrics") {
+  // TODO: This test has a race condition since the DAGScheduler now reports results
+  //       asynchronously. It needs to be updated for that patch.
+  ignore("local metrics") {
     sc = new SparkContext("local[4]", "test")
     val listener = new SaveStageInfo
     sc.addSparkListener(listener)
@@ -43,6 +45,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
 
     val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
     d.count
+    Thread.sleep(1000)
     listener.stageInfos.size should be (1)
 
     val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
@@ -54,6 +57,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
 
     d4.collectAsMap
 
+    Thread.sleep(1000)
     listener.stageInfos.size should be (4)
     listener.stageInfos.foreach {stageInfo =>
       //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index f4e1d4e802c7178eead535245131ccc1457d9c67..3764f4d1a0c9b9932e99698ee8162757b80b0425 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -25,6 +25,13 @@ import org.eclipse.jetty.server.Server
 class UISuite extends FunSuite {
   test("jetty port increases under contention") {
     val startPort = 4040
+    val server = new Server(startPort)
+
+    Try { server.start() } match {
+      case Success(s) => 
+      case Failure(e) => 
+      // Either case server port is busy hence setup for test complete
+    }
     val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
     val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
     // Allow some wiggle room in case ports on the machine are under contention
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index e2859caf58d5a9fbfbce963cd496e3bf740ad87b..4684c8c972d3c31fa7fd2cb223958640fe475d4a 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
 import com.google.common.base.Charsets
 import com.google.common.io.Files
 import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream, File}
+import java.nio.{ByteBuffer, ByteOrder}
 import org.scalatest.FunSuite
 import org.apache.commons.io.FileUtils
 import scala.util.Random
@@ -135,5 +136,15 @@ class UtilsSuite extends FunSuite {
 
     FileUtils.deleteDirectory(tmpDir2)
   }
+
+  test("deserialize long value") {
+    val testval : Long = 9730889947L
+    val bbuf = ByteBuffer.allocate(8)
+    assert(bbuf.hasArray)
+    bbuf.order(ByteOrder.BIG_ENDIAN)
+    bbuf.putLong(testval)
+    assert(bbuf.array.length === 8)
+    assert(Utils.deserializeLongValue(bbuf.array) === testval)
+  }
 }
 
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 238ad26de0212f427d59eef1fcebc0c7744f7274..0c1d657cde470abbb0ca00c0cd266dbd2fe587bf 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -6,7 +6,7 @@
     <head>
         <meta charset="utf-8">
         <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
-        <title>{{ page.title }} - Spark {{site.SPARK_VERSION}} Documentation</title>
+        <title>{{ page.title }} - Spark {{site.SPARK_VERSION_SHORT}} Documentation</title>
         <meta name="description" content="">
 
         <link rel="stylesheet" href="css/bootstrap.min.css">
@@ -109,7 +109,7 @@
                             </ul>
                         </li>
                     </ul>
-                    <!--<p class="navbar-text pull-right"><span class="version-text">v{{site.SPARK_VERSION}}</span></p>-->
+                    <!--<p class="navbar-text pull-right"><span class="version-text">v{{site.SPARK_VERSION_SHORT}}</span></p>-->
                 </div>
             </div>
         </div>
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 1190ed47f6bcc4502fa597c704d2c2c870d1b319..65868b76b91fcfdd895ba7a14bbf2895407471e4 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -70,7 +70,7 @@ def parse_args():
            "slaves across multiple (an additional $0.01/Gb for bandwidth" +
            "between zones applies)")
   parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use")
-  parser.add_option("-v", "--spark-version", default="0.7.3",
+  parser.add_option("-v", "--spark-version", default="0.8.0",
       help="Version of Spark to use: 'X.Y.Z' or a specific git hash")
   parser.add_option("--spark-git-repo", 
       default="https://github.com/mesos/spark", 
@@ -155,7 +155,7 @@ def is_active(instance):
 
 # Return correct versions of Spark and Shark, given the supplied Spark version
 def get_spark_shark_version(opts):
-  spark_shark_map = {"0.7.3": "0.7.0"}
+  spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0"}
   version = opts.spark_version.replace("v", "")
   if version not in spark_shark_map:
     print >> stderr, "Don't know about Spark version: %s" % version
diff --git a/examples/pom.xml b/examples/pom.xml
index 3c0a8d06edf32d2c10cc6b39be7a5796c76c26a1..c6c9def5be75569486dfb9a2b8e618b3521c58e1 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -26,33 +26,41 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>spark-examples</artifactId>
+  <artifactId>spark-examples_${scala-short.version}</artifactId>
   <packaging>jar</packaging>
   <name>Spark Project Examples</name>
   <url>http://spark.incubator.apache.org/</url>
 
+  <repositories>
+    <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 -->
+    <repository>
+      <id>lib</id>
+      <url>file://${project.basedir}/lib</url>
+    </repository>
+  </repositories>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core</artifactId>
+      <artifactId>spark-core_${scala-short.version}</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming</artifactId>
+      <artifactId>spark-streaming_${scala-short.version}</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-mllib</artifactId>
+      <artifactId>spark-mllib_${scala-short.version}</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-bagel</artifactId>
+      <artifactId>spark-bagel_${scala-short.version}</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
@@ -71,6 +79,12 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka</artifactId>
+      <version>0.7.2-spark</version>  <!-- Comes from our in-project repository -->
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-server</artifactId>
@@ -161,7 +175,7 @@
             </goals>
             <configuration>
               <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                   <resource>reference.conf</resource>
                 </transformer>
diff --git a/mllib/pom.xml b/mllib/pom.xml
index 4cff5e370017f0adadb3db68c108446f83f66278..a57bddeff3118d4c6c85a4c23088cb1b002da60f 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -26,7 +26,7 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>spark-mllib</artifactId>
+  <artifactId>spark-mllib_${scala-short.version}</artifactId>
   <packaging>jar</packaging>
   <name>Spark Project ML Library</name>
   <url>http://spark.incubator.apache.org/</url>
@@ -34,7 +34,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core</artifactId>
+      <artifactId>spark-core_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
@@ -48,12 +48,12 @@
     </dependency>
     <dependency>
       <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_2.10</artifactId>
+      <artifactId>scalatest_${scala-short.version}</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.scalacheck</groupId>
-      <artifactId>scalacheck_2.10</artifactId>
+      <artifactId>scalacheck_${scala-short.version}</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/pom.xml b/pom.xml
index 844ba74252d0f787c1fba76ddb65430e0779a98f..70e883d1fbc0937bf4b4b80c2c30d964cc57bec3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,6 +40,7 @@
     <connection>scm:git:git@github.com:apache/incubator-spark.git</connection>
     <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-spark.git</developerConnection>
     <url>scm:git:git@github.com:apache/incubator-spark.git</url>
+    <tag>HEAD</tag>
   </scm>
   <developers>
     <developer>
@@ -598,7 +599,7 @@
             <junitxml>.</junitxml>
             <filereports>${project.build.directory}/SparkTestSuite.txt</filereports>
             <argLine>-Xms64m -Xmx3g</argLine>
-            <stderr/>
+            <stderr />
           </configuration>
           <executions>
             <execution>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 19d3aa23adca5039a6a17a2b939a450d1f5059b9..2c63c0f40324930d4d0c41eeec1fc0a3a4b53ea6 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -153,6 +153,7 @@ object SparkBuild extends Build {
 
 */
 
+
     libraryDependencies ++= Seq(
         "io.netty"          % "netty-all"       % "4.0.0.CR1",
         "org.eclipse.jetty" % "jetty-server"    % "7.6.8.v20121106",
@@ -179,6 +180,7 @@ object SparkBuild extends Build {
 
   val slf4jVersion = "1.7.2"
 
+  val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject")
   val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson")
   val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
   val excludeAsm = ExclusionRule(organization = "asm")
@@ -202,7 +204,6 @@ object SparkBuild extends Build {
         "commons-daemon"           % "commons-daemon"   % "1.0.10", // workaround for bug HADOOP-9407
         "org.ow2.asm"              % "asm"              % "4.0",
         "com.google.protobuf"      % "protobuf-java"    % "2.4.1",
-        "de.javakaffee"            % "kryo-serializers" % "0.22",
         "com.typesafe.akka"       %% "akka-remote"      % "2.2.1"  excludeAll(excludeNetty), 
         "com.typesafe.akka"       %% "akka-slf4j"       % "2.2.1"  excludeAll(excludeNetty),
         "net.liftweb"             %% "lift-json"        % "2.5.1"  excludeAll(excludeNetty),
@@ -220,7 +221,7 @@ object SparkBuild extends Build {
         "com.codahale.metrics"     % "metrics-ganglia"  % "3.0.0",
         "com.twitter"             %% "chill"            % "0.3.1",
         "com.twitter"              % "chill-java"       % "0.3.1"
-      ) 
+      )
   )
 
   def rootSettings = sharedSettings ++ Seq(
@@ -250,6 +251,7 @@ object SparkBuild extends Build {
         exclude("log4j","log4j")
         exclude("org.apache.cassandra.deps", "avro")
         excludeAll(excludeSnappy)
+        excludeAll(excludeCglib)
     )
   ) ++ assemblySettings ++ extraAssemblySettings
 
@@ -289,10 +291,10 @@ object SparkBuild extends Build {
   def yarnEnabledSettings = Seq(
     libraryDependencies ++= Seq(
       // Exclude rule required for all ?
-      "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
-      "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
-      "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
-      "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm)
+      "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+      "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+      "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+      "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib)
     )
   )
 
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 7611b13e82fd616bc31f3473252866dd088a45f3..33dc86525619f8b61db65483dfcd06b24cecc05b 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -29,7 +29,7 @@ from threading import Thread
 
 from pyspark import cloudpickle
 from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \
-    read_from_pickle_file
+    read_from_pickle_file, pack_long
 from pyspark.join import python_join, python_left_outer_join, \
     python_right_outer_join, python_cogroup
 from pyspark.statcounter import StatCounter
@@ -690,11 +690,13 @@ class RDD(object):
         # form the hash buckets in Python, transferring O(numPartitions) objects
         # to Java.  Each object is a (splitNumber, [objects]) pair.
         def add_shuffle_key(split, iterator):
+
             buckets = defaultdict(list)
+
             for (k, v) in iterator:
                 buckets[partitionFunc(k) % numPartitions].append((k, v))
             for (split, items) in buckets.iteritems():
-                yield str(split)
+                yield pack_long(split)
                 yield dump_pickle(Batch(items))
         keyed = PipelinedRDD(self, add_shuffle_key)
         keyed._bypass_serializer = True
@@ -831,8 +833,8 @@ class RDD(object):
         >>> sorted(x.subtractByKey(y).collect())
         [('b', 4), ('b', 5)]
         """ 
-        filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0
-        map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]]
+        filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
+        map_func = lambda (key, vals): [(key, val) for val in vals[0]]
         return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
 
     def subtract(self, other, numPartitions=None):
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index fecacd12416af2e703fd67c953c02162bb841c13..54fed1c9c70f66e503abb5c523d6327bb9bae8b4 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -67,6 +67,10 @@ def write_long(value, stream):
     stream.write(struct.pack("!q", value))
 
 
+def pack_long(value):
+    return struct.pack("!q", value)
+
+
 def read_int(stream):
     length = stream.read(4)
     if length == "":
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index 05aadc7bdf2f3ea984ab73297ae5ccd9db515600..c983ea5dfb97b42f3aad337375444e1fc4fdc899 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -26,7 +26,7 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>spark-repl-bin</artifactId>
+  <artifactId>spark-repl-bin_${scala-short.version}</artifactId>
   <packaging>pom</packaging>
   <name>Spark Project REPL binary packaging</name>
   <url>http://spark.incubator.apache.org/</url>
@@ -40,18 +40,18 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core</artifactId>
+      <artifactId>spark-core_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-bagel</artifactId>
+      <artifactId>spark-bagel_${scala-short.version}</artifactId>
       <version>${project.version}</version>
       <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-repl</artifactId>
+      <artifactId>spark-repl_${scala-short.version}</artifactId>
       <version>${project.version}</version>
       <scope>runtime</scope>
     </dependency>
@@ -89,7 +89,7 @@
             </goals>
             <configuration>
               <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                   <resource>reference.conf</resource>
                 </transformer>
diff --git a/repl/pom.xml b/repl/pom.xml
index d4b1ea10be536dac2659f4e8e21fac9cebab209e..ff6649322911b0edef7b94bd4fce3abd9a256f05 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -26,7 +26,7 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>spark-repl</artifactId>
+  <artifactId>spark-repl_${scala-short.version}</artifactId>
   <packaging>jar</packaging>
   <name>Spark Project REPL</name>
   <url>http://spark.incubator.apache.org/</url>
@@ -39,18 +39,18 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core</artifactId>
+      <artifactId>spark-core_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-bagel</artifactId>
+      <artifactId>spark-bagel_${scala-short.version}</artifactId>
       <version>${project.version}</version>
       <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-mllib</artifactId>
+      <artifactId>spark-mllib_${scala-short.version}</artifactId>
       <version>${project.version}</version>
       <scope>runtime</scope>
     </dependency>
@@ -103,14 +103,14 @@
             <configuration>
               <exportAntProperties>true</exportAntProperties>
               <tasks>
-                <property name="spark.classpath" refid="maven.test.classpath"/>
-                <property environment="env"/>
+                <property name="spark.classpath" refid="maven.test.classpath" />
+                <property environment="env" />
                 <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
                   <condition>
                     <not>
                       <or>
-                        <isset property="env.SCALA_HOME"/>
-                        <isset property="env.SCALA_LIBRARY_PATH"/>
+                        <isset property="env.SCALA_HOME" />
+                        <isset property="env.SCALA_LIBRARY_PATH" />
                       </or>
                     </not>
                   </condition>
diff --git a/streaming/pom.xml b/streaming/pom.xml
index c74e76d526e7c14b32419861f4f78369ebbb43bd..3f2033f34a4d03af6e496b2445d8a2ed7751bf10 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -26,7 +26,7 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>spark-streaming</artifactId>
+  <artifactId>spark-streaming_${scala-short.version}</artifactId>
   <packaging>jar</packaging>
   <name>Spark Project Streaming</name>
   <url>http://spark.incubator.apache.org/</url>
@@ -42,7 +42,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core</artifactId>
+      <artifactId>spark-core_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
@@ -58,6 +58,7 @@
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka</artifactId>
       <version>0.7.2-spark</version>  <!-- Comes from our in-project repository -->
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.flume</groupId>
diff --git a/tools/pom.xml b/tools/pom.xml
index 2456457722406c984a3a8741efb061f8f362739b..db87b54deca5529c9521027576a91c7fac18af98 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -25,7 +25,7 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>spark-tools</artifactId>
+  <artifactId>spark-tools_${scala-short.version}</artifactId>
   <packaging>jar</packaging>
   <name>Spark Project Tools</name>
   <url>http://spark.incubator.apache.org/</url>
@@ -33,12 +33,12 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core</artifactId>
+      <artifactId>spark-core_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming</artifactId>
+      <artifactId>spark-streaming_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 7f852d3540d8e8ebffe588943dda8b0d7ddbc2a3..7770cbb0cc678d6d1e09be689562b51574fe34bb 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -25,7 +25,7 @@
   </parent>
 
   <groupId>org.apache.spark</groupId>
-  <artifactId>spark-yarn</artifactId>
+  <artifactId>spark-yarn_${scala-short.version}</artifactId>
   <packaging>jar</packaging>
   <name>Spark Project YARN Support</name>
   <url>http://spark.incubator.apache.org/</url>
@@ -33,7 +33,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core</artifactId>
+      <artifactId>spark-core_${scala-short.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
@@ -97,7 +97,7 @@
             </goals>
             <configuration>
               <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                   <resource>reference.conf</resource>
                 </transformer>