From f4e6b9361ffeec1018d5834f09db9fd86f2ba7bd Mon Sep 17 00:00:00 2001 From: Stephen Haberman <stephen@exigencecorp.com> Date: Fri, 4 Jan 2013 22:43:22 -0600 Subject: [PATCH] Add RDD.collect(PartialFunction). --- core/src/main/scala/spark/RDD.scala | 7 +++++++ core/src/test/scala/spark/RDDSuite.scala | 1 + 2 files changed, 8 insertions(+) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 7e38583391..5163c80134 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -329,6 +329,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial */ def toArray(): Array[T] = collect() + /** + * Return an RDD that contains all matching values by applying `f`. + */ + def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = { + filter(f.isDefinedAt).map(f) + } + /** * Reduces the elements of this RDD using the specified associative binary operator. */ diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 45e6c5f840..872b06fd08 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -35,6 +35,7 @@ class RDDSuite extends FunSuite with BeforeAndAfter { assert(nums.flatMap(x => 1 to x).collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)) assert(nums.union(nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4))) + assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4")) val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _))) assert(partitionSums.collect().toList === List(3, 7)) -- GitLab