diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index ae10f615d1312be7f8dea3cca7862fcb98cc297d..1c3d94e1b0831d69e4635e328e69a0511a94e3f4 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -80,6 +80,14 @@
 #     /metrics/aplications/json # App information
 #     /metrics/master/json      # Master information
 
+# org.apache.spark.metrics.sink.GraphiteSink
+#   Name:     Default:      Description:
+#   host      NONE          Hostname of Graphite server
+#   port      NONE          Port of Graphite server
+#   period    10            Poll period
+#   unit      seconds       Units of poll period
+#   prefix    EMPTY STRING  Prefix to prepend to metric name
+
 ## Examples
 # Enable JmxSink for all instances by class name
 #*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
diff --git a/core/pom.xml b/core/pom.xml
index 468dd7124920912bf23b5457db67a53dd401beae..e2033c9912018c94d22585b514fb57ca2a3ce7f3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -150,6 +150,10 @@
       <groupId>com.codahale.metrics</groupId>
       <artifactId>metrics-ganglia</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-graphite</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.derby</groupId>
       <artifactId>derby</artifactId>
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
new file mode 100644
index 0000000000000000000000000000000000000000..cdcfec8ca785bcafc3574164654f699d12da4768
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+import java.net.InetSocketAddress
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.graphite.{GraphiteReporter, Graphite}
+
+import org.apache.spark.metrics.MetricsSystem
+
+class GraphiteSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+  val GRAPHITE_DEFAULT_PERIOD = 10
+  val GRAPHITE_DEFAULT_UNIT = "SECONDS"
+  val GRAPHITE_DEFAULT_PREFIX = ""
+
+  val GRAPHITE_KEY_HOST = "host"
+  val GRAPHITE_KEY_PORT = "port"
+  val GRAPHITE_KEY_PERIOD = "period"
+  val GRAPHITE_KEY_UNIT = "unit"
+  val GRAPHITE_KEY_PREFIX = "prefix"
+
+  def propertyToOption(prop: String) = Option(property.getProperty(prop))
+
+  if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) {
+    throw new Exception("Graphite sink requires 'host' property.")
+  }
+
+  if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) {
+    throw new Exception("Graphite sink requires 'port' property.")
+  }
+
+  val host = propertyToOption(GRAPHITE_KEY_HOST).get
+  val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt
+
+  val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match {
+    case Some(s) => s.toInt
+    case None => GRAPHITE_DEFAULT_PERIOD
+  }
+
+  val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
+    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+    case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT)
+  }
+
+  val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX)
+
+  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+  val graphite: Graphite = new Graphite(new InetSocketAddress(host, port))
+
+  val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry)
+      .convertDurationsTo(TimeUnit.MILLISECONDS)
+      .convertRatesTo(TimeUnit.SECONDS)
+      .prefixedWith(prefix)
+      .build(graphite)
+
+  override def start() {
+    reporter.start(pollPeriod, pollUnit)
+  }
+
+  override def stop() {
+    reporter.stop()
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3c237ca20a4c8c7d9c341fabe966f4559b824f15..5f6407aadc4ef96cf3c4d20e2bf6f25b1eeea5fe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -551,17 +551,27 @@ abstract class RDD[T: ClassTag](
   def zipPartitions[B: ClassTag, V: ClassTag]
       (rdd2: RDD[B])
       (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
+    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
+
+  def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
+      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
 
   def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
       (rdd2: RDD[B], rdd3: RDD[C])
       (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
+    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false)
+
+  def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
+      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
 
   def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
       (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
       (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
+    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false)
 
 
   // Actions (launch a job to return a value to the user program)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index e02c17bf45514c0cc1e5750763d599864fc2a807..9313bf87ec6e3e039ce423e802d12c1f0ad05be4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -40,9 +40,13 @@ private[spark] class ZippedPartitionsPartition(
 
 abstract class ZippedPartitionsBaseRDD[V: ClassTag](
     sc: SparkContext,
-    var rdds: Seq[RDD[_]])
+    var rdds: Seq[RDD[_]],
+    preservesPartitioning: Boolean = false)
   extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) {
 
+  override val partitioner =
+    if (preservesPartitioning) firstParent[Any].partitioner else None
+
   override def getPartitions: Array[Partition] = {
     val sizes = rdds.map(x => x.partitions.size)
     if (!sizes.forall(x => x == sizes(0))) {
@@ -77,8 +81,9 @@ class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
     sc: SparkContext,
     f: (Iterator[A], Iterator[B]) => Iterator[V],
     var rdd1: RDD[A],
-    var rdd2: RDD[B])
-  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {
+    var rdd2: RDD[B],
+    preservesPartitioning: Boolean = false)
+  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
     val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
@@ -98,8 +103,9 @@ class ZippedPartitionsRDD3
     f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
     var rdd1: RDD[A],
     var rdd2: RDD[B],
-    var rdd3: RDD[C])
-  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) {
+    var rdd3: RDD[C],
+    preservesPartitioning: Boolean = false)
+  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
     val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
@@ -123,8 +129,9 @@ class ZippedPartitionsRDD4
     var rdd1: RDD[A],
     var rdd2: RDD[B],
     var rdd3: RDD[C],
-    var rdd4: RDD[D])
-  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) {
+    var rdd4: RDD[D],
+    preservesPartitioning: Boolean = false)
+  extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
     val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 1dc71a04282e52940e9297f284b480a9d046dad7..0f2deb4bcbbb207b7757cd183896ceb3dc7b2ff9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -167,6 +167,7 @@ private[spark] class ShuffleMapTask(
       var totalTime = 0L
       val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
         writer.commit()
+        writer.close()
         val size = writer.fileSegment().length
         totalBytes += size
         totalTime += writer.timeWriting()
@@ -184,14 +185,16 @@ private[spark] class ShuffleMapTask(
     } catch { case e: Exception =>
       // If there is an exception from running the task, revert the partial writes
       // and throw the exception upstream to Spark.
-      if (shuffle != null) {
-        shuffle.writers.foreach(_.revertPartialWrites())
+      if (shuffle != null && shuffle.writers != null) {
+        for (writer <- shuffle.writers) {
+          writer.revertPartialWrites()
+          writer.close()
+        }
       }
       throw e
     } finally {
       // Release the writers back to the shuffle block manager.
       if (shuffle != null && shuffle.writers != null) {
-        shuffle.writers.foreach(_.close())
         shuffle.releaseWriters(success)
       }
       // Execute the callbacks on task completion.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 469e68fed74bb4effb3aa9efa11157859d047084..b4451fc7b8e5666a9c93c8e89d27ebb08b224543 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -93,6 +93,8 @@ class DiskBlockObjectWriter(
     def write(i: Int): Unit = callWithTiming(out.write(i))
     override def write(b: Array[Byte]) = callWithTiming(out.write(b))
     override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len))
+    override def close() = out.close()
+    override def flush() = out.flush()
   }
 
   private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean
diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
index f60deafc6f32386bd1640fefaee233942936cf76..8bb4ee3bfa22e3ad1233d778e59feedc892115f7 100644
--- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
@@ -35,6 +35,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
   private var capacity = nextPowerOf2(initialCapacity)
   private var mask = capacity - 1
   private var curSize = 0
+  private var growThreshold = LOAD_FACTOR * capacity
 
   // Holds keys and values in the same array for memory locality; specifically, the order of
   // elements is key0, value0, key1, value1, key2, value2, etc.
@@ -56,7 +57,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
     var i = 1
     while (true) {
       val curKey = data(2 * pos)
-      if (k.eq(curKey) || k == curKey) {
+      if (k.eq(curKey) || k.equals(curKey)) {
         return data(2 * pos + 1).asInstanceOf[V]
       } else if (curKey.eq(null)) {
         return null.asInstanceOf[V]
@@ -80,9 +81,23 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
       haveNullValue = true
       return
     }
-    val isNewEntry = putInto(data, k, value.asInstanceOf[AnyRef])
-    if (isNewEntry) {
-      incrementSize()
+    var pos = rehash(key.hashCode) & mask
+    var i = 1
+    while (true) {
+      val curKey = data(2 * pos)
+      if (curKey.eq(null)) {
+        data(2 * pos) = k
+        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+        incrementSize()  // Since we added a new key
+        return
+      } else if (k.eq(curKey) || k.equals(curKey)) {
+        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+        return
+      } else {
+        val delta = i
+        pos = (pos + delta) & mask
+        i += 1
+      }
     }
   }
 
@@ -104,7 +119,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
     var i = 1
     while (true) {
       val curKey = data(2 * pos)
-      if (k.eq(curKey) || k == curKey) {
+      if (k.eq(curKey) || k.equals(curKey)) {
         val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
         return newValue
@@ -161,45 +176,17 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
   /** Increase table size by 1, rehashing if necessary */
   private def incrementSize() {
     curSize += 1
-    if (curSize > LOAD_FACTOR * capacity) {
+    if (curSize > growThreshold) {
       growTable()
     }
   }
 
   /**
-   * Re-hash a value to deal better with hash functions that don't differ
-   * in the lower bits, similar to java.util.HashMap
+   * Re-hash a value to deal better with hash functions that don't differ in the lower bits.
+   * We use the Murmur Hash 3 finalization step that's also used in fastutil.
    */
   private def rehash(h: Int): Int = {
-    val r = h ^ (h >>> 20) ^ (h >>> 12)
-    r ^ (r >>> 7) ^ (r >>> 4)
-  }
-
-  /**
-   * Put an entry into a table represented by data, returning true if
-   * this increases the size of the table or false otherwise. Assumes
-   * that "data" has at least one empty slot.
-   */
-  private def putInto(data: Array[AnyRef], key: AnyRef, value: AnyRef): Boolean = {
-    val mask = (data.length / 2) - 1
-    var pos = rehash(key.hashCode) & mask
-    var i = 1
-    while (true) {
-      val curKey = data(2 * pos)
-      if (curKey.eq(null)) {
-        data(2 * pos) = key
-        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
-        return true
-      } else if (curKey.eq(key) || curKey == key) {
-        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
-        return false
-      } else {
-        val delta = i
-        pos = (pos + delta) & mask
-        i += 1
-      }
-    }
-    return false  // Never reached but needed to keep compiler happy
+    it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
   }
 
   /** Double the table's size and re-hash everything */
@@ -211,16 +198,36 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
       throw new Exception("Can't make capacity bigger than 2^29 elements")
     }
     val newData = new Array[AnyRef](2 * newCapacity)
-    var pos = 0
-    while (pos < capacity) {
-      if (!data(2 * pos).eq(null)) {
-        putInto(newData, data(2 * pos), data(2 * pos + 1))
+    val newMask = newCapacity - 1
+    // Insert all our old values into the new array. Note that because our old keys are
+    // unique, there's no need to check for equality here when we insert.
+    var oldPos = 0
+    while (oldPos < capacity) {
+      if (!data(2 * oldPos).eq(null)) {
+        val key = data(2 * oldPos)
+        val value = data(2 * oldPos + 1)
+        var newPos = rehash(key.hashCode) & newMask
+        var i = 1
+        var keepGoing = true
+        while (keepGoing) {
+          val curKey = newData(2 * newPos)
+          if (curKey.eq(null)) {
+            newData(2 * newPos) = key
+            newData(2 * newPos + 1) = value
+            keepGoing = false
+          } else {
+            val delta = i
+            newPos = (newPos + delta) & newMask
+            i += 1
+          }
+        }
       }
-      pos += 1
+      oldPos += 1
     }
     data = newData
     capacity = newCapacity
-    mask = newCapacity - 1
+    mask = newMask
+    growThreshold = LOAD_FACTOR * newCapacity
   }
 
   private def nextPowerOf2(n: Int): Int = {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 02adcb41c6f1d320ebe56ad941616ad7cff1dd79..3f7858d2dedc311cdbf70017d10d0c17cd173dbc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -824,4 +824,28 @@ private[spark] object Utils extends Logging {
     return System.getProperties().clone()
       .asInstanceOf[java.util.Properties].toMap[String, String]
   }
+
+  /**
+   * Method executed for repeating a task for side effects.
+   * Unlike a for comprehension, it permits JVM JIT optimization
+   */
+  def times(numIters: Int)(f: => Unit): Unit = {
+    var i = 0
+    while (i < numIters) {
+      f
+      i += 1
+    }
+  }
+
+  /** 
+   * Timing method based on iterations that permit JVM JIT optimization.
+   * @param numIters number of iterations
+   * @param f function to be executed
+   */
+  def timeIt(numIters: Int)(f: => Unit): Long = {
+    val start = System.currentTimeMillis
+    times(numIters)(f)
+    System.currentTimeMillis - start
+  }
+
 }
diff --git a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
new file mode 100644
index 0000000000000000000000000000000000000000..e9907e6c855aea4cc945f98c48b59293a441a1d8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.{Random => JavaRandom}
+import org.apache.spark.util.Utils.timeIt
+
+/**
+ * This class implements a XORShift random number generator algorithm 
+ * Source:
+ * Marsaglia, G. (2003). Xorshift RNGs. Journal of Statistical Software, Vol. 8, Issue 14.
+ * @see <a href="http://www.jstatsoft.org/v08/i14/paper">Paper</a>
+ * This implementation is approximately 3.5 times faster than
+ * {@link java.util.Random java.util.Random}, partly because of the algorithm, but also due
+ * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class 
+ * uses a regular Long. We can forgo thread safety since we use a new instance of the RNG
+ * for each thread.
+ */
+private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) {
+  
+  def this() = this(System.nanoTime)
+
+  private var seed = init
+  
+  // we need to just override next - this will be called by nextInt, nextDouble,
+  // nextGaussian, nextLong, etc.
+  override protected def next(bits: Int): Int = {    
+    var nextSeed = seed ^ (seed << 21)
+    nextSeed ^= (nextSeed >>> 35)
+    nextSeed ^= (nextSeed << 4)  
+    seed = nextSeed
+    (nextSeed & ((1L << bits) -1)).asInstanceOf[Int]
+  }
+}
+
+/** Contains benchmark method and main method to run benchmark of the RNG */
+private[spark] object XORShiftRandom {
+
+  /**
+   * Main method for running benchmark
+   * @param args takes one argument - the number of random numbers to generate
+   */
+  def main(args: Array[String]): Unit = {
+    if (args.length != 1) {
+      println("Benchmark of XORShiftRandom vis-a-vis java.util.Random")
+      println("Usage: XORShiftRandom number_of_random_numbers_to_generate")
+      System.exit(1)
+    }
+    println(benchmark(args(0).toInt))
+  }
+
+  /**
+   * @param numIters Number of random numbers to generate while running the benchmark
+   * @return Map of execution times for {@link java.util.Random java.util.Random}
+   * and XORShift
+   */
+  def benchmark(numIters: Int) = {
+
+    val seed = 1L
+    val million = 1e6.toInt
+    val javaRand = new JavaRandom(seed)
+    val xorRand = new XORShiftRandom(seed)
+    
+    // this is just to warm up the JIT - we're not timing anything
+    timeIt(1e6.toInt) {
+      javaRand.nextInt()
+      xorRand.nextInt()
+    }
+
+    val iters = timeIt(numIters)(_)
+    
+    /* Return results as a map instead of just printing to screen
+    in case the user wants to do something with them */ 
+    Map("javaTime" -> iters {javaRand.nextInt()},
+        "xorTime" -> iters {xorRand.nextInt()})
+
+  }
+
+}
\ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b78367b6cac028b217abea0fcbb79c08ab270045
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.Random
+import org.scalatest.FlatSpec
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.util.Utils.times
+
+class XORShiftRandomSuite extends FunSuite with ShouldMatchers {
+
+  def fixture = new {
+    val seed = 1L
+    val xorRand = new XORShiftRandom(seed)
+    val hundMil = 1e8.toInt
+  }
+   
+  /*
+   * This test is based on a chi-squared test for randomness. The values are hard-coded 
+   * so as not to create Spark's dependency on apache.commons.math3 just to call one
+   * method for calculating the exact p-value for a given number of random numbers
+   * and bins. In case one would want to move to a full-fledged test based on 
+   * apache.commons.math3, the relevant class is here:
+   * org.apache.commons.math3.stat.inference.ChiSquareTest
+   */
+  test ("XORShift generates valid random numbers") {
+
+    val f = fixture
+
+    val numBins = 10
+    // create 10 bins
+    val bins = Array.fill(numBins)(0)
+
+    // populate bins based on modulus of the random number
+    times(f.hundMil) {bins(math.abs(f.xorRand.nextInt) % 10) += 1}
+
+    /* since the seed is deterministic, until the algorithm is changed, we know the result will be 
+     * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 
+     * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%) 
+     * significance level. However, should the RNG implementation change, the test should still 
+     * pass at the same significance level. The chi-squared test done in R gave the following 
+     * results:
+     *   > chisq.test(c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272,
+     *     10000790, 10002286, 9998699))
+     *     Chi-squared test for given probabilities
+     *     data:  c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790, 
+     *            10002286, 9998699)
+     *     X-squared = 11.975, df = 9, p-value = 0.2147
+     * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million 
+     * random numbers
+     * and 10 bins will happen at X-squared of ~16.9196. So, the test will fail if X-squared
+     * is greater than or equal to that number.
+     */
+    val binSize = f.hundMil/numBins
+    val xSquared = bins.map(x => math.pow((binSize - x), 2)/binSize).sum
+    xSquared should be <  (16.9196)
+
+  }
+
+}
\ No newline at end of file
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 5f456b999b0e187434bfcee09b572a2cde508657..5ed0474477302b2768bdd59e6a66afea867a126c 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -50,6 +50,7 @@ Each instance can report to zero or more _sinks_. Sinks are contained in the
 * `GangliaSink`: Sends metrics to a Ganglia node or multicast group.
 * `JmxSink`: Registers metrics for viewing in a JXM console.
 * `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
+* `GraphiteSink`: Sends metrics to a Graphite node.
 
 The syntax of the metrics configuration file is defined in an example configuration file, 
 `$SPARK_HOME/conf/metrics.conf.template`.
diff --git a/docs/tuning.md b/docs/tuning.md
index f491ae9b95c08b757bbd3630575c5eee1a89e40e..f33fda37ebaba4d6b34d775bdf0190c987a3da83 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -67,7 +67,7 @@ The [Kryo documentation](http://code.google.com/p/kryo/) describes more advanced
 registration options, such as adding custom serialization code.
 
 If your objects are large, you may also need to increase the `spark.kryoserializer.buffer.mb`
-system property. The default is 32, but this value needs to be large enough to hold the *largest*
+system property. The default is 2, but this value needs to be large enough to hold the *largest*
 object you will serialize.
 
 Finally, if you don't register your classes, Kryo will still work, but it will have to store the
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index edbf77dbcc3a44b9d7b38a6080904a8dbcf2079b..0dee9399a86ea2bd5a9d9292f5f3d550fdce76c1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -18,15 +18,16 @@
 package org.apache.spark.mllib.clustering
 
 import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
+
+import org.jblas.DoubleMatrix
 
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.Logging
 import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.util.XORShiftRandom
 
-import org.jblas.DoubleMatrix
 
 
 /**
@@ -195,7 +196,7 @@ class KMeans private (
    */
   private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = {
     // Sample all the cluster centers in one pass to avoid repeated scans
-    val sample = data.takeSample(true, runs * k, new Random().nextInt()).toSeq
+    val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq
     Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).toArray)
   }
 
@@ -210,7 +211,7 @@ class KMeans private (
    */
   private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = {
     // Initialize each run's center to a random point
-    val seed = new Random().nextInt()
+    val seed = new XORShiftRandom().nextInt()
     val sample = data.takeSample(true, runs, seed).toSeq
     val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
 
@@ -222,7 +223,7 @@ class KMeans private (
         for (r <- 0 until runs) yield (r, KMeans.pointCost(centerArrays(r), point))
       }.reduceByKey(_ + _).collectAsMap()
       val chosen = data.mapPartitionsWithIndex { (index, points) =>
-        val rand = new Random(seed ^ (step << 16) ^ index)
+        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
         for {
           p <- points
           r <- 0 until runs
diff --git a/pom.xml b/pom.xml
index 4be9d3a3d23c5c69d796f179fb0a124ad8a8ce0a..8700a4828f3faf7b462892de9b60b8638c404eba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -354,6 +354,11 @@
         <artifactId>metrics-ganglia</artifactId>
         <version>3.0.0</version>
       </dependency>
+      <dependency>
+        <groupId>com.codahale.metrics</groupId>
+        <artifactId>metrics-graphite</artifactId>
+        <version>3.0.0</version>
+      </dependency>
       <dependency>
         <groupId>org.scala-lang</groupId>
         <artifactId>scala-compiler</artifactId>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aa5fe70470262760da7c508bba12d772b1b4e6f2..3584e884f1753d06f2ebf993f5494d53952a91e7 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -205,9 +205,9 @@ object SparkBuild extends Build {
         "log4j"                    % "log4j"            % "1.2.17",
         "org.slf4j"                % "slf4j-api"        % slf4jVersion,
         "org.slf4j"                % "slf4j-log4j12"    % slf4jVersion,
+        "commons-daemon"           % "commons-daemon"   % "1.0.10", // workaround for bug HADOOP-9407
         "com.ning"                 % "compress-lzf"     % "0.8.4",
         "org.xerial.snappy"        % "snappy-java"      % "1.0.5",
-        "commons-daemon"           % "commons-daemon"   % "1.0.10", // workaround for bug HADOOP-9407
         "org.ow2.asm"              % "asm"              % "4.0",
         "com.google.protobuf"      % "protobuf-java"    % "2.4.1",
         "com.typesafe.akka"       %% "akka-remote"      % "2.2.3"  excludeAll(excludeNetty), 
@@ -226,6 +226,7 @@ object SparkBuild extends Build {
         "com.codahale.metrics"     % "metrics-jvm"      % "3.0.0",
         "com.codahale.metrics"     % "metrics-json"     % "3.0.0",
         "com.codahale.metrics"     % "metrics-ganglia"  % "3.0.0",
+        "com.codahale.metrics"     % "metrics-graphite" % "3.0.0",
         "com.twitter"             %% "chill"            % "0.3.1",
         "com.twitter"              % "chill-java"       % "0.3.1"
       )