Skip to content
Snippets Groups Projects
Commit 084839ba authored by Prashant Sharma's avatar Prashant Sharma Committed by Josh Rosen
Browse files

Merge pull request #498 from ScrapCodes/python-api. Closes #498.

Python api additions

Author: Prashant Sharma <prashant.s@imaginea.com>

== Merge branch commits ==

commit 8b51591f1a7a79a62c13ee66ff8d83040f7eccd8
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Fri Jan 24 11:50:29 2014 +0530

    Josh's and Patricks review comments.

commit d37f9677838e43bef6c18ef61fbf08055ba6d1ca
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Thu Jan 23 17:27:17 2014 +0530

    fixed doc tests

commit 27cb54bf5c99b1ea38a73858c291d0a1c43d8b7c
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Thu Jan 23 16:48:43 2014 +0530

    Added keys and values methods for PairFunctions in python

commit 4ce76b396fbaefef2386d7a36d611572bdef9b5d
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Thu Jan 23 13:51:26 2014 +0530

    Added foreachPartition

commit 05f05341a187cba829ac0e6c2bdf30be49948c89
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Thu Jan 23 13:02:59 2014 +0530

    Added coalesce fucntion to python API

commit 6568d2c2fa14845dc56322c0f39ba2e13b3b26dd
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Thu Jan 23 12:52:44 2014 +0530

    added repartition function to python API.
parent 79c95527
No related branches found
No related tags found
No related merge requests found
...@@ -455,6 +455,18 @@ class RDD(object): ...@@ -455,6 +455,18 @@ class RDD(object):
yield None yield None
self.mapPartitions(processPartition).collect() # Force evaluation self.mapPartitions(processPartition).collect() # Force evaluation
def foreachPartition(self, f):
"""
Applies a function to each partition of this RDD.
>>> def f(iterator):
... for x in iterator:
... print x
... yield None
>>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
"""
self.mapPartitions(f).collect() # Force evaluation
def collect(self): def collect(self):
""" """
Return a list that contains all of the elements in this RDD. Return a list that contains all of the elements in this RDD.
...@@ -695,6 +707,24 @@ class RDD(object): ...@@ -695,6 +707,24 @@ class RDD(object):
""" """
return dict(self.collect()) return dict(self.collect())
def keys(self):
"""
Return an RDD with the keys of each tuple.
>>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
>>> m.collect()
[1, 3]
"""
return self.map(lambda (k, v): k)
def values(self):
"""
Return an RDD with the values of each tuple.
>>> m = sc.parallelize([(1, 2), (3, 4)]).values()
>>> m.collect()
[2, 4]
"""
return self.map(lambda (k, v): v)
def reduceByKey(self, func, numPartitions=None): def reduceByKey(self, func, numPartitions=None):
""" """
Merge the values for each key using an associative reduce function. Merge the values for each key using an associative reduce function.
...@@ -987,6 +1017,36 @@ class RDD(object): ...@@ -987,6 +1017,36 @@ class RDD(object):
""" """
return self.map(lambda x: (f(x), x)) return self.map(lambda x: (f(x), x))
def repartition(self, numPartitions):
"""
Return a new RDD that has exactly numPartitions partitions.
Can increase or decrease the level of parallelism in this RDD. Internally, this uses
a shuffle to redistribute data.
If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
which can avoid performing a shuffle.
>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
>>> sorted(rdd.glom().collect())
[[1], [2, 3], [4, 5], [6, 7]]
>>> len(rdd.repartition(2).glom().collect())
2
>>> len(rdd.repartition(10).glom().collect())
10
"""
jrdd = self._jrdd.repartition(numPartitions)
return RDD(jrdd, self.ctx, self._jrdd_deserializer)
def coalesce(self, numPartitions, shuffle=False):
"""
Return a new RDD that is reduced into `numPartitions` partitions.
>>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
[[1], [2, 3], [4, 5]]
>>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
[[1, 2, 3, 4, 5]]
"""
jrdd = self._jrdd.coalesce(numPartitions)
return RDD(jrdd, self.ctx, self._jrdd_deserializer)
# TODO: `lookup` is disabled because we can't make direct comparisons based # TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the # on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those # keys in the pairs. This could be an expensive operation, since those
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment