From e03bc379ee03fde0ee4fa578d3c39aae35c63f01 Mon Sep 17 00:00:00 2001
From: Holden Karau <holden@pigscanfly.ca>
Date: Thu, 24 Apr 2014 23:07:54 -0700
Subject: [PATCH] SPARK-1242 Add aggregate to python rdd

Author: Holden Karau <holden@pigscanfly.ca>

Closes #139 from holdenk/add_aggregate_to_python_api and squashes the following commits:

0f39ae3 [Holden Karau] Merge in master
4879c75 [Holden Karau] CR feedback, fix issue with empty RDDs in aggregate
70b4724 [Holden Karau] Style fixes from code review
96b047b [Holden Karau] Add aggregate to python rdd
---
 python/pyspark/rdd.py | 31 +++++++++++++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index d73ab7006e..a59778c721 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -599,7 +599,7 @@ class RDD(object):
     def reduce(self, f):
         """
         Reduces the elements of this RDD using the specified commutative and
-        associative binary operator.
+        associative binary operator. Currently reduces partitions locally.
 
         >>> from operator import add
         >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
@@ -641,7 +641,34 @@ class RDD(object):
         vals = self.mapPartitions(func).collect()
         return reduce(op, vals, zeroValue)
 
-    # TODO: aggregate
+    def aggregate(self, zeroValue, seqOp, combOp):
+        """
+        Aggregate the elements of each partition, and then the results for all
+        the partitions, using a given combine functions and a neutral "zero
+        value."
+
+        The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
+        as its result value to avoid object allocation; however, it should not
+        modify C{t2}.
+
+        The first function (seqOp) can return a different result type, U, than
+        the type of this RDD. Thus, we need one operation for merging a T into an U
+        and one operation for merging two U
+
+        >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
+        >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
+        >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
+        (10, 4)
+        >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
+        (0, 0)
+        """
+        def func(iterator):
+            acc = zeroValue
+            for obj in iterator:
+                acc = seqOp(acc, obj)
+            yield acc
+
+        return self.mapPartitions(func).fold(zeroValue, combOp)
         
 
     def max(self):
-- 
GitLab