diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py index 8477f6dd02a1b2e9f595cbe14726870461f489a4..e2137fe06c9c6bf8a523fe89228fe12546600170 100644 --- a/pyspark/pyspark/rdd.py +++ b/pyspark/pyspark/rdd.py @@ -1,5 +1,5 @@ from base64 import standard_b64encode as b64enc -from collections import Counter +from collections import defaultdict from itertools import chain, ifilter, imap import shlex from subprocess import Popen, PIPE @@ -198,13 +198,18 @@ class RDD(object): def countByValue(self): """ - >>> sc.parallelize([1, 2, 1, 2, 2]).countByValue().most_common() - [(2, 3), (1, 2)] + >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) + [(1, 2), (2, 3)] """ def countPartition(iterator): - yield Counter(iterator) + counts = defaultdict(int) + for obj in iterator: + counts[obj] += 1 + yield counts def mergeMaps(m1, m2): - return m1 + m2 + for (k, v) in m2.iteritems(): + m1[k] += v + return m1 return self.mapPartitions(countPartition).reduce(mergeMaps) def take(self, num): @@ -271,7 +276,7 @@ class RDD(object): def countByKey(self): """ >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) - >>> rdd.countByKey().most_common() + >>> sorted(rdd.countByKey().items()) [('a', 2), ('b', 1)] """ return self.map(lambda x: x[0]).countByValue() diff --git a/python/tc.py b/python/tc.py deleted file mode 100644 index 5dcc4317e0f1f7e6b55f7793cfbe58a13d031c7d..0000000000000000000000000000000000000000 --- a/python/tc.py +++ /dev/null @@ -1,22 +0,0 @@ -from rdd import SparkContext - -sc = SparkContext("local", "PythonWordCount") -e = [(1, 2), (2, 3), (4, 1)] - -tc = sc.parallelizePairs(e) - -edges = tc.mapPairs(lambda (x, y): (y, x)) - -oldCount = 0 -nextCount = tc.count() - -def project(x): - return (x[1][1], x[1][0]) - -while nextCount != oldCount: - oldCount = nextCount - tc = tc.union(tc.join(edges).mapPairs(project)).distinct() - nextCount = tc.count() - -print "TC has %i edges" % tc.count() -print tc.collect()