diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index aca235a62a6a86e2e09cab4acbe01e2db2fb8bf5..7d96089e52ab989247f323b9ce23c4b5ac5e28f8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -25,7 +25,7 @@ import scala.language.existentials
 import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
 import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
+import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleHandle
 
@@ -66,14 +66,14 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
  */
 @DeveloperApi
 class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
-  extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
+  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
 
   // For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
   // Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
   // CoGroupValue is the intermediate state of each value before being merged in compute.
-  private type CoGroup = ArrayBuffer[Any]
+  private type CoGroup = CompactBuffer[Any]
   private type CoGroupValue = (Any, Int)  // Int is dependency number
-  private type CoGroupCombiner = Seq[CoGroup]
+  private type CoGroupCombiner = Array[CoGroup]
 
   private var serializer: Option[Serializer] = None
 
@@ -114,7 +114,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
 
   override val partitioner: Some[Partitioner] = Some(part)
 
-  override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
+  override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
     val sparkConf = SparkEnv.get.conf
     val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
     val split = s.asInstanceOf[CoGroupPartition]
@@ -150,7 +150,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
           getCombiner(kv._1)(depNum) += kv._2
         }
       }
-      new InterruptibleIterator(context, map.iterator)
+      new InterruptibleIterator(context,
+        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
     } else {
       val map = createExternalMap(numRdds)
       rddIterators.foreach { case (it, depNum) =>
@@ -161,7 +162,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
       }
       context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
       context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
-      new InterruptibleIterator(context, map.iterator)
+      new InterruptibleIterator(context,
+        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index a6b920467283e85337063e9bec1374218e0c7077..c04d162a39616c8a6ec0952e1677b25980c3b97a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -46,6 +46,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.SparkContext._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.collection.CompactBuffer
 
 /**
  * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -361,12 +362,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     // groupByKey shouldn't use map side combine because map side combine does not
     // reduce the amount of data shuffled and requires all map side data be inserted
     // into a hash table, leading to more objects in the old gen.
-    val createCombiner = (v: V) => ArrayBuffer(v)
-    val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
-    val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
-    val bufs = combineByKey[ArrayBuffer[V]](
+    val createCombiner = (v: V) => CompactBuffer(v)
+    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
+    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
+    val bufs = combineByKey[CompactBuffer[V]](
       createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
-    bufs.mapValues(_.toIterable)
+    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
   }
 
   /**
@@ -571,11 +572,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
-    cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
-       (vs.asInstanceOf[Seq[V]],
-         w1s.asInstanceOf[Seq[W1]],
-         w2s.asInstanceOf[Seq[W2]],
-         w3s.asInstanceOf[Seq[W3]])
+    cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
+       (vs.asInstanceOf[Iterable[V]],
+         w1s.asInstanceOf[Iterable[W1]],
+         w2s.asInstanceOf[Iterable[W2]],
+         w3s.asInstanceOf[Iterable[W3]])
     }
   }
 
@@ -589,8 +590,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
-    cg.mapValues { case Seq(vs, w1s) =>
-      (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
+    cg.mapValues { case Array(vs, w1s) =>
+      (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
     }
   }
 
@@ -604,10 +605,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
-    cg.mapValues { case Seq(vs, w1s, w2s) =>
-      (vs.asInstanceOf[Seq[V]],
-        w1s.asInstanceOf[Seq[W1]],
-        w2s.asInstanceOf[Seq[W2]])
+    cg.mapValues { case Array(vs, w1s, w2s) =>
+      (vs.asInstanceOf[Iterable[V]],
+        w1s.asInstanceOf[Iterable[W1]],
+        w2s.asInstanceOf[Iterable[W2]])
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c3a3e90a34901590e4b8c69e1cf65ad4f3882e80..fa79b2575915389b7afbfb57cfc324ff2442c6ff 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -31,6 +31,7 @@ import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage._
 import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
 import org.apache.spark.util.BoundedPriorityQueue
+import org.apache.spark.util.collection.CompactBuffer
 
 import scala.reflect.ClassTag
 
@@ -185,6 +186,7 @@ private[serializer] object KryoSerializer {
     classOf[GotBlock],
     classOf[GetBlock],
     classOf[MapStatus],
+    classOf[CompactBuffer[_]],
     classOf[BlockManagerId],
     classOf[Array[Byte]],
     classOf[BoundedPriorityQueue[_]],
diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
new file mode 100644
index 0000000000000000000000000000000000000000..d44e15e3c97ea3d8f756825564e2b95a463a29e9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+/**
+ * An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
+ * ArrayBuffer always allocates an Object array to store the data, with 16 entries by default,
+ * so it has about 80-100 bytes of overhead. In contrast, CompactBuffer can keep up to two
+ * elements in fields of the main object, and only allocates an Array[AnyRef] if there are more
+ * entries than that. This makes it more efficient for operations like groupBy where we expect
+ * some keys to have very few elements.
+ */
+private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
+  // First two elements
+  private var element0: T = _
+  private var element1: T = _
+
+  // Number of elements, including our two in the main object
+  private var curSize = 0
+
+  // Array for extra elements
+  private var otherElements: Array[AnyRef] = null
+
+  def apply(position: Int): T = {
+    if (position < 0 || position >= curSize) {
+      throw new IndexOutOfBoundsException
+    }
+    if (position == 0) {
+      element0
+    } else if (position == 1) {
+      element1
+    } else {
+      otherElements(position - 2).asInstanceOf[T]
+    }
+  }
+
+  private def update(position: Int, value: T): Unit = {
+    if (position < 0 || position >= curSize) {
+      throw new IndexOutOfBoundsException
+    }
+    if (position == 0) {
+      element0 = value
+    } else if (position == 1) {
+      element1 = value
+    } else {
+      otherElements(position - 2) = value.asInstanceOf[AnyRef]
+    }
+  }
+
+  def += (value: T): CompactBuffer[T] = {
+    val newIndex = curSize
+    if (newIndex == 0) {
+      element0 = value
+      curSize = 1
+    } else if (newIndex == 1) {
+      element1 = value
+      curSize = 2
+    } else {
+      growToSize(curSize + 1)
+      otherElements(newIndex - 2) = value.asInstanceOf[AnyRef]
+    }
+    this
+  }
+
+  def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
+    values match {
+      // Optimize merging of CompactBuffers, used in cogroup and groupByKey
+      case compactBuf: CompactBuffer[T] =>
+        val oldSize = curSize
+        // Copy the other buffer's size and elements to local variables in case it is equal to us
+        val itsSize = compactBuf.curSize
+        val itsElements = compactBuf.otherElements
+        growToSize(curSize + itsSize)
+        if (itsSize == 1) {
+          this(oldSize) = compactBuf.element0
+        } else if (itsSize == 2) {
+          this(oldSize) = compactBuf.element0
+          this(oldSize + 1) = compactBuf.element1
+        } else if (itsSize > 2) {
+          this(oldSize) = compactBuf.element0
+          this(oldSize + 1) = compactBuf.element1
+          // At this point our size is also above 2, so just copy its array directly into ours.
+          // Note that since we added two elements above, the index in this.otherElements that we
+          // should copy to is oldSize.
+          System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2)
+        }
+
+      case _ =>
+        values.foreach(e => this += e)
+    }
+    this
+  }
+
+  override def length: Int = curSize
+
+  override def size: Int = curSize
+
+  override def iterator: Iterator[T] = new Iterator[T] {
+    private var pos = 0
+    override def hasNext: Boolean = pos < curSize
+    override def next(): T = {
+      if (!hasNext) {
+        throw new NoSuchElementException
+      }
+      pos += 1
+      apply(pos - 1)
+    }
+  }
+
+  /** Increase our size to newSize and grow the backing array if needed. */
+  private def growToSize(newSize: Int): Unit = {
+    if (newSize < 0) {
+      throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
+    }
+    val capacity = if (otherElements != null) otherElements.length + 2 else 2
+    if (newSize > capacity) {
+      var newArrayLen = 8
+      while (newSize - 2 > newArrayLen) {
+        newArrayLen *= 2
+        if (newArrayLen == Int.MinValue) {
+          // Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
+          // Note that we set the new array length to Int.MaxValue - 2 so that our capacity
+          // calculation above still gives a positive integer.
+          newArrayLen = Int.MaxValue - 2
+        }
+      }
+      val newArray = new Array[AnyRef](newArrayLen)
+      if (otherElements != null) {
+        System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
+      }
+      otherElements = newArray
+    }
+    curSize = newSize
+  }
+}
+
+private[spark] object CompactBuffer {
+  def apply[T](): CompactBuffer[T] = new CompactBuffer[T]
+
+  def apply[T](value: T): CompactBuffer[T] = {
+    val buf = new CompactBuffer[T]
+    buf += value
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index fc00458083a33acff99a399f829988b9acaf1e32..d1cb2d9d3a53b69aa56bdcf9af6ee2ffa5b326f1 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -156,15 +156,20 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("CoGroupedRDD") {
     val longLineageRDD1 = generateFatPairRDD()
+
+    // Collect the RDD as sequences instead of arrays to enable equality tests in testRDD
+    val seqCollectFunc = (rdd: RDD[(Int, Array[Iterable[Int]])]) =>
+      rdd.map{case (p, a) => (p, a.toSeq)}.collect(): Any
+
     testRDD(rdd => {
       CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
-    })
+    }, seqCollectFunc)
 
     val longLineageRDD2 = generateFatPairRDD()
     testRDDPartitions(rdd => {
       CheckpointSuite.cogroup(
         longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
-    })
+    }, seqCollectFunc)
   }
 
   test("ZippedPartitionsRDD") {
@@ -235,12 +240,19 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     assert(rdd.partitions.size === 0)
   }
 
+  def defaultCollectFunc[T](rdd: RDD[T]): Any = rdd.collect()
+
   /**
    * Test checkpointing of the RDD generated by the given operation. It tests whether the
    * serialized size of the RDD is reduce after checkpointing or not. This function should be called
    * on all RDDs that have a parent RDD (i.e., do not call on ParallelCollection, BlockRDD, etc.).
+   *
+   * @param op an operation to run on the RDD
+   * @param collectFunc a function for collecting the values in the RDD, in case there are
+   *   non-comparable types like arrays that we want to convert to something that supports ==
    */
-  def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
+  def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U],
+      collectFunc: RDD[U] => Any = defaultCollectFunc[U] _) {
     // Generate the final RDD using given RDD operation
     val baseRDD = generateFatRDD()
     val operatedRDD = op(baseRDD)
@@ -258,13 +270,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
     val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
     operatedRDD.checkpoint()
-    val result = operatedRDD.collect()
+    val result = collectFunc(operatedRDD)
     operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
     val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
     logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
 
     // Test whether the checkpoint file has been created
-    assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
+    assert(collectFunc(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get)) === result)
 
     // Test whether dependencies have been changed from its earlier parent RDD
     assert(operatedRDD.dependencies.head.rdd != parentRDD)
@@ -279,7 +291,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     assert(operatedRDD.partitions.length === numPartitions)
 
     // Test whether the data in the checkpointed RDD is same as original
-    assert(operatedRDD.collect() === result)
+    assert(collectFunc(operatedRDD) === result)
 
     // Test whether serialized size of the RDD has reduced.
     logInfo("Size of " + rddType +
@@ -289,7 +301,6 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
       "Size of " + rddType + " did not reduce after checkpointing " +
         " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
     )
-
   }
 
   /**
@@ -300,8 +311,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
    * This function should be called only those RDD whose partitions refer to parent RDD's
    * partitions (i.e., do not call it on simple RDD like MappedRDD).
    *
+   * @param op an operation to run on the RDD
+   * @param collectFunc a function for collecting the values in the RDD, in case there are
+   *   non-comparable types like arrays that we want to convert to something that supports ==
    */
-  def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
+  def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U],
+       collectFunc: RDD[U] => Any = defaultCollectFunc[U] _) {
     // Generate the final RDD using given RDD operation
     val baseRDD = generateFatRDD()
     val operatedRDD = op(baseRDD)
@@ -316,13 +331,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
     val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
     parentRDDs.foreach(_.checkpoint())  // checkpoint the parent RDD, not the generated one
-    val result = operatedRDD.collect()  // force checkpointing
+    val result = collectFunc(operatedRDD)  // force checkpointing
     operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
     val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
     logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
 
     // Test whether the data in the checkpointed RDD is same as original
-    assert(operatedRDD.collect() === result)
+    assert(collectFunc(operatedRDD) === result)
 
     // Test whether serialized size of the partitions has reduced
     logInfo("Size of partitions of " + rddType +
@@ -436,7 +451,7 @@ object CheckpointSuite {
     new CoGroupedRDD[K](
       Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
       part
-    ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
+    ).asInstanceOf[RDD[(K, Array[Iterable[V]])]]
   }
 
 }
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 237e644b48e49dd068ec779cf2135257046507da..eae67c7747e82865a92c53a5343ba61f69ba3a5a 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -176,7 +176,9 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
     val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
     val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
     val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
-    val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
+    val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2))
+      .map(p => (p._1, p._2.map(_.toArray)))
+      .collectAsMap()
 
     assert(results(1)(0).length === 3)
     assert(results(1)(0).contains(1))
diff --git a/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..6c956d93dc80dc07634643569c20c817c0113c35
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import org.scalatest.FunSuite
+
+class CompactBufferSuite extends FunSuite {
+  test("empty buffer") {
+    val b = new CompactBuffer[Int]
+    assert(b.size === 0)
+    assert(b.iterator.toList === Nil)
+    assert(b.size === 0)
+    assert(b.iterator.toList === Nil)
+    intercept[IndexOutOfBoundsException] { b(0) }
+    intercept[IndexOutOfBoundsException] { b(1) }
+    intercept[IndexOutOfBoundsException] { b(2) }
+    intercept[IndexOutOfBoundsException] { b(-1) }
+  }
+
+  test("basic inserts") {
+    val b = new CompactBuffer[Int]
+    assert(b.size === 0)
+    assert(b.iterator.toList === Nil)
+    for (i <- 0 until 1000) {
+      b += i
+      assert(b.size === i + 1)
+      assert(b(i) === i)
+    }
+    assert(b.iterator.toList === (0 until 1000).toList)
+    assert(b.iterator.toList === (0 until 1000).toList)
+    assert(b.size === 1000)
+  }
+
+  test("adding sequences") {
+    val b = new CompactBuffer[Int]
+    assert(b.size === 0)
+    assert(b.iterator.toList === Nil)
+
+    // Add some simple lists and iterators
+    b ++= List(0)
+    assert(b.size === 1)
+    assert(b.iterator.toList === List(0))
+    b ++= Iterator(1)
+    assert(b.size === 2)
+    assert(b.iterator.toList === List(0, 1))
+    b ++= List(2)
+    assert(b.size === 3)
+    assert(b.iterator.toList === List(0, 1, 2))
+    b ++= Iterator(3, 4, 5, 6, 7, 8, 9)
+    assert(b.size === 10)
+    assert(b.iterator.toList === (0 until 10).toList)
+
+    // Add CompactBuffers
+    val b2 = new CompactBuffer[Int]
+    b2 ++= 0 until 10
+    b ++= b2
+    assert(b.iterator.toList === (1 to 2).flatMap(i => 0 until 10).toList)
+    b ++= b2
+    assert(b.iterator.toList === (1 to 3).flatMap(i => 0 until 10).toList)
+    b ++= b2
+    assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList)
+
+    // Add some small CompactBuffers as well
+    val b3 = new CompactBuffer[Int]
+    b ++= b3
+    assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList)
+    b3 += 0
+    b ++= b3
+    assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0))
+    b3 += 1
+    b ++= b3
+    assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0, 0, 1))
+    b3 += 2
+    b ++= b3
+    assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0, 0, 1, 0, 1, 2))
+  }
+
+  test("adding the same buffer to itself") {
+    val b = new CompactBuffer[Int]
+    assert(b.size === 0)
+    assert(b.iterator.toList === Nil)
+    b += 1
+    assert(b.toList === List(1))
+    for (j <- 1 until 8) {
+      b ++= b
+      assert(b.size === (1 << j))
+      assert(b.iterator.toList === (1 to (1 << j)).map(i => 1).toList)
+    }
+  }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
index 88de2c82479b7b7a2356290b25fda4349bef7cb9..1f7de630e778ca4aef9719cce14008447a1710a1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
@@ -122,6 +122,10 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
   private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = {
     val partitioner = new HashPartitioner(input.partitions.size)
     val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
-    cogrouped.map { case (_, values: Seq[Seq[Double]]) => new DenseVector(values.flatten.toArray) }
+    cogrouped.map {
+      case (_, values: Array[Iterable[_]]) =>
+        val doubles = values.asInstanceOf[Array[Iterable[Double]]]
+        new DenseVector(doubles.flatten.toArray)
+    }
   }
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 40da31318942ed0b653a4fef5469cb505deeaf2a..1a47089e513c4502eacd767e36c8520e43d1e072 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -133,17 +133,17 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
     val numOldValues = oldRDDs.size
     val numNewValues = newRDDs.size
 
-    val mergeValues = (seqOfValues: Seq[Seq[V]]) => {
-      if (seqOfValues.size != 1 + numOldValues + numNewValues) {
+    val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {
+      if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
         throw new Exception("Unexpected number of sequences of reduced values")
       }
       // Getting reduced values "old time steps" that will be removed from current window
-      val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
+      val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head)
       // Getting reduced values "new time steps"
       val newValues =
-        (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+        (1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
 
-      if (seqOfValues(0).isEmpty) {
+      if (arrayOfValues(0).isEmpty) {
         // If previous window's reduce value does not exist, then at least new values should exist
         if (newValues.isEmpty) {
           throw new Exception("Neither previous window has value for key, nor new values found. " +
@@ -153,7 +153,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
         newValues.reduce(reduceF) // return
       } else {
         // Get the previous window's reduced value
-        var tempValue = seqOfValues(0).head
+        var tempValue = arrayOfValues(0).head
         // If old values exists, then inverse reduce then from previous value
         if (!oldValues.isEmpty) {
           tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
@@ -166,7 +166,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
       }
     }
 
-    val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
+    val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
+      .mapValues(mergeValues)
 
     if (filterFunc.isDefined) {
       Some(mergedValuesRDD.filter(filterFunc.get))