diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index c3e93f964ecf327bffdd08910bfacaab360d38a0..dd17d4d6b32a409452c96001425bdb62c90bddde 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -274,7 +274,11 @@ class SparkContext( } /** Build the union of a list of RDDs. */ - def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = new UnionRDD(this, rdds) + def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) + + /** Build the union of a list of RDDs. */ + def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] = + new UnionRDD(this, Seq(first) ++ rest) // Methods for creating shared variables diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index 2d43bfa4efb9cd3ae337ce2282dc30d3e2bf5654..08c92b145ea3f17deccb85fb5d580c031ad7ee75 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -177,7 +177,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) implicit val cm: ClassManifest[T] = first.classManifest - sc.union(rdds: _*)(cm) + sc.union(rdds)(cm) } override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]]) @@ -186,12 +186,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork implicit val cm: ClassManifest[(K, V)] = first.classManifest implicit val kcm: ClassManifest[K] = first.kManifest implicit val vcm: ClassManifest[V] = first.vManifest - new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm) + new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm) } override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = { val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd) - new JavaDoubleRDD(sc.union(rdds: _*)) + new JavaDoubleRDD(sc.union(rdds)) } def intAccumulator(initialValue: Int): Accumulator[Int] = diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 3924a6890befcbbe28c650e1d4f22dd8d01784f4..4a79c086e9bf618f6ada018ad748d9084b8b97ad 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -30,6 +30,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter { assert(partitionSums.collect().toList === List(3, 7)) } + test("SparkContext.union") { + sc = new SparkContext("local", "test") + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + assert(sc.union(nums).collect().toList === List(1, 2, 3, 4)) + assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) + assert(sc.union(Seq(nums)).collect().toList === List(1, 2, 3, 4)) + assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) + } + test("aggregate") { sc = new SparkContext("local", "test") val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))