Skip to content
Snippets Groups Projects
Commit dee8ff1b authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Added a second version of union() without varargs.

parent b51d733a
No related branches found
No related tags found
No related merge requests found
...@@ -274,7 +274,11 @@ class SparkContext( ...@@ -274,7 +274,11 @@ class SparkContext(
} }
/** Build the union of a list of RDDs. */ /** Build the union of a list of RDDs. */
def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = new UnionRDD(this, rdds) def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
/** Build the union of a list of RDDs. */
def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)
// Methods for creating shared variables // Methods for creating shared variables
......
...@@ -177,7 +177,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork ...@@ -177,7 +177,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
implicit val cm: ClassManifest[T] = first.classManifest implicit val cm: ClassManifest[T] = first.classManifest
sc.union(rdds: _*)(cm) sc.union(rdds)(cm)
} }
override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]]) override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
...@@ -186,12 +186,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork ...@@ -186,12 +186,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
implicit val cm: ClassManifest[(K, V)] = first.classManifest implicit val cm: ClassManifest[(K, V)] = first.classManifest
implicit val kcm: ClassManifest[K] = first.kManifest implicit val kcm: ClassManifest[K] = first.kManifest
implicit val vcm: ClassManifest[V] = first.vManifest implicit val vcm: ClassManifest[V] = first.vManifest
new JavaPairRDD(sc.union(rdds: _*)(cm))(kcm, vcm) new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
} }
override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = { override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd) val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
new JavaDoubleRDD(sc.union(rdds: _*)) new JavaDoubleRDD(sc.union(rdds))
} }
def intAccumulator(initialValue: Int): Accumulator[Int] = def intAccumulator(initialValue: Int): Accumulator[Int] =
......
...@@ -30,6 +30,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter { ...@@ -30,6 +30,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
assert(partitionSums.collect().toList === List(3, 7)) assert(partitionSums.collect().toList === List(3, 7))
} }
test("SparkContext.union") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
assert(sc.union(Seq(nums)).collect().toList === List(1, 2, 3, 4))
assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
}
test("aggregate") { test("aggregate") {
sc = new SparkContext("local", "test") sc = new SparkContext("local", "test")
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment