diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index e7408e4352abfc2e53e20f91039e186ff72c139f..a6e00c3a8484f7158291f2b4acaafe35d401d621 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -88,6 +88,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) } + /** + * Merge the values for each key using an associative function and a neutral "zero value". + */ + def foldByKey[V1 >: V](zeroValue: V1)(op: (V1, V1) => V1): RDD[(K, V1)] = { + groupByKey.mapValues(seq => seq.fold(zeroValue)(op)) + } + /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a