From dee8ff1b9da2381b7542dfb2d8085fa45372ec36 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Fri, 27 Jul 2012 16:27:52 -0700
Subject: [PATCH] Added a second version of union() without varargs.

---
 core/src/main/scala/spark/SparkContext.scala             | 6 +++++-
 .../src/main/scala/spark/api/java/JavaSparkContext.scala | 6 +++---
 core/src/test/scala/spark/RDDSuite.scala                 | 9 +++++++++
 3 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index c3e93f964e..dd17d4d6b3 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -274,7 +274,11 @@ class SparkContext(
   }
 
   /** Build the union of a list of RDDs. */
-  def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = new UnionRDD(this, rdds)
+  def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
+
+  /** Build the union of a list of RDDs. */
+  def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] =
+    new UnionRDD(this, Seq(first) ++ rest)
 
   // Methods for creating shared variables
 
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index 2d43bfa4ef..08c92b145e 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -177,7 +177,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
   override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
     val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
     implicit val cm: ClassManifest[T] = first.classManifest
-    sc.union(rdds: _*)(cm)
+    sc.union(rdds)(cm)
   }
 
   override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
@@ -186,12 +186,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     implicit val cm: ClassManifest[(K, V)] = first.classManifest
     implicit val kcm: ClassManifest[K] = first.kManifest
     implicit val vcm: ClassManifest[V] = first.vManifest
-    new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm)
+    new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
   }
 
   override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
     val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
-    new JavaDoubleRDD(sc.union(rdds: _*))
+    new JavaDoubleRDD(sc.union(rdds))
   }
 
   def intAccumulator(initialValue: Int): Accumulator[Int] =
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 3924a6890b..4a79c086e9 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -30,6 +30,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
     assert(partitionSums.collect().toList === List(3, 7))
   }
 
+  test("SparkContext.union") {
+    sc = new SparkContext("local", "test")
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
+    assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
+    assert(sc.union(Seq(nums)).collect().toList === List(1, 2, 3, 4))
+    assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
+  }
+
   test("aggregate") {
     sc = new SparkContext("local", "test")
     val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
-- 
GitLab