From ca9f81e8fcf46bceb032f3bd0efb9f6ed2c25656 Mon Sep 17 00:00:00 2001 From: Mark Hamstra <markhamstra@gmail.com> Date: Sat, 16 Mar 2013 13:31:01 -0700 Subject: [PATCH] refactor foldByKey to use combineByKey --- core/src/main/scala/spark/PairRDDFunctions.scala | 14 ++++++++++---- .../main/scala/spark/api/java/JavaPairRDD.scala | 12 +++++++++--- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index ffa6336ceb..e1dde1b497 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -89,22 +89,28 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( } /** - * Merge the values for each key using an associative function and a neutral "zero value". + * Merge the values for each key using an associative function and a neutral "zero value" which may + * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for + * list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = { - groupByKey(partitioner).mapValues(seq => seq.fold[V](zeroValue)(func)) + combineByKey[V]({v: V => func(zeroValue, v)}, func, func, partitioner) } /** - * Merge the values for each key using an associative function and a neutral "zero value". + * Merge the values for each key using an associative function and a neutral "zero value" which may + * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for + * list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = { foldByKey(zeroValue, new HashPartitioner(numPartitions))(func) } /** - * Merge the values for each key using an associative function and a neutral "zero value". + * Merge the values for each key using an associative function and a neutral "zero value" which may + * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for + * list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = { foldByKey(zeroValue, defaultPartitioner(self))(func) diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 1dc73a3584..49aaabf835 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -161,19 +161,25 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap) /** - * Merge the values for each key using an associative function and a neutral "zero value". + * Merge the values for each key using an associative function and a neutral "zero value" which may + * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for + * list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = fromRDD(rdd.foldByKey(zeroValue, partitioner)(func)) /** - * Merge the values for each key using an associative function and a neutral "zero value". + * Merge the values for each key using an associative function and a neutral "zero value" which may + * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for + * list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func)) /** - * Merge the values for each key using an associative function and a neutral "zero value". + * Merge the values for each key using an associative function and a neutral "zero value" which may + * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for + * list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = fromRDD(rdd.foldByKey(zeroValue)(func)) -- GitLab