diff --git a/python/pyspark/join.py b/python/pyspark/join.py index 7036c479806679d539a4779df92ed1bfd94bf694..5f4294fb1b7778090584041286822eabbb0e1efe 100644 --- a/python/pyspark/join.py +++ b/python/pyspark/join.py @@ -32,13 +32,13 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -def _do_python_join(rdd, other, numSplits, dispatch): +def _do_python_join(rdd, other, numPartitions, dispatch): vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) - return vs.union(ws).groupByKey(numSplits).flatMapValues(dispatch) + return vs.union(ws).groupByKey(numPartitions).flatMapValues(dispatch) -def python_join(rdd, other, numSplits): +def python_join(rdd, other, numPartitions): def dispatch(seq): vbuf, wbuf = [], [] for (n, v) in seq: @@ -47,10 +47,10 @@ def python_join(rdd, other, numSplits): elif n == 2: wbuf.append(v) return [(v, w) for v in vbuf for w in wbuf] - return _do_python_join(rdd, other, numSplits, dispatch) + return _do_python_join(rdd, other, numPartitions, dispatch) -def python_right_outer_join(rdd, other, numSplits): +def python_right_outer_join(rdd, other, numPartitions): def dispatch(seq): vbuf, wbuf = [], [] for (n, v) in seq: @@ -61,10 +61,10 @@ def python_right_outer_join(rdd, other, numSplits): if not vbuf: vbuf.append(None) return [(v, w) for v in vbuf for w in wbuf] - return _do_python_join(rdd, other, numSplits, dispatch) + return _do_python_join(rdd, other, numPartitions, dispatch) -def python_left_outer_join(rdd, other, numSplits): +def python_left_outer_join(rdd, other, numPartitions): def dispatch(seq): vbuf, wbuf = [], [] for (n, v) in seq: @@ -75,10 +75,10 @@ def python_left_outer_join(rdd, other, numSplits): if not wbuf: wbuf.append(None) return [(v, w) for v in vbuf for w in wbuf] - return _do_python_join(rdd, other, numSplits, dispatch) + return _do_python_join(rdd, other, numPartitions, dispatch) -def python_cogroup(rdd, other, numSplits): +def python_cogroup(rdd, other, numPartitions): vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) def dispatch(seq): @@ -89,4 +89,4 @@ def python_cogroup(rdd, other, numSplits): elif n == 2: wbuf.append(v) return (vbuf, wbuf) - return vs.union(ws).groupByKey(numSplits).mapValues(dispatch) + return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6b6ab6abd97e99b8850ac0764ba2c4ebb4cef5d8..172ed85fab9267b7cfffbec8c0474701c9faf23f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -215,7 +215,7 @@ class RDD(object): yield pair return java_cartesian.flatMap(unpack_batches) - def groupBy(self, f, numSplits=None): + def groupBy(self, f, numPartitions=None): """ Return an RDD of grouped items. @@ -224,7 +224,7 @@ class RDD(object): >>> sorted([(x, sorted(y)) for (x, y) in result]) [(0, [2, 8]), (1, [1, 1, 3, 5])] """ - return self.map(lambda x: (f(x), x)).groupByKey(numSplits) + return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) def pipe(self, command, env={}): """ @@ -274,7 +274,7 @@ class RDD(object): def reduce(self, f): """ - Reduces the elements of this RDD using the specified commutative and + Reduces the elements of this RDD using the specified commutative and associative binary operator. >>> from operator import add @@ -422,22 +422,22 @@ class RDD(object): """ return dict(self.collect()) - def reduceByKey(self, func, numSplits=None): + def reduceByKey(self, func, numPartitions=None): """ Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. - Output will be hash-partitioned with C{numSplits} splits, or the - default parallelism level if C{numSplits} is not specified. + Output will be hash-partitioned with C{numPartitions} partitions, or + the default parallelism level if C{numPartitions} is not specified. >>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)] """ - return self.combineByKey(lambda x: x, func, func, numSplits) + return self.combineByKey(lambda x: x, func, func, numPartitions) def reduceByKeyLocally(self, func): """ @@ -474,7 +474,7 @@ class RDD(object): """ return self.map(lambda x: x[0]).countByValue() - def join(self, other, numSplits=None): + def join(self, other, numPartitions=None): """ Return an RDD containing all pairs of elements with matching keys in C{self} and C{other}. @@ -489,9 +489,9 @@ class RDD(object): >>> sorted(x.join(y).collect()) [('a', (1, 2)), ('a', (1, 3))] """ - return python_join(self, other, numSplits) + return python_join(self, other, numPartitions) - def leftOuterJoin(self, other, numSplits=None): + def leftOuterJoin(self, other, numPartitions=None): """ Perform a left outer join of C{self} and C{other}. @@ -506,9 +506,9 @@ class RDD(object): >>> sorted(x.leftOuterJoin(y).collect()) [('a', (1, 2)), ('b', (4, None))] """ - return python_left_outer_join(self, other, numSplits) + return python_left_outer_join(self, other, numPartitions) - def rightOuterJoin(self, other, numSplits=None): + def rightOuterJoin(self, other, numPartitions=None): """ Perform a right outer join of C{self} and C{other}. @@ -523,10 +523,10 @@ class RDD(object): >>> sorted(y.rightOuterJoin(x).collect()) [('a', (2, 1)), ('b', (None, 4))] """ - return python_right_outer_join(self, other, numSplits) + return python_right_outer_join(self, other, numPartitions) # TODO: add option to control map-side combining - def partitionBy(self, numSplits, partitionFunc=hash): + def partitionBy(self, numPartitions, partitionFunc=hash): """ Return a copy of the RDD partitioned using the specified partitioner. @@ -535,22 +535,22 @@ class RDD(object): >>> set(sets[0]).intersection(set(sets[1])) set([]) """ - if numSplits is None: - numSplits = self.ctx.defaultParallelism + if numPartitions is None: + numPartitions = self.ctx.defaultParallelism # Transferring O(n) objects to Java is too expensive. Instead, we'll - # form the hash buckets in Python, transferring O(numSplits) objects + # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. def add_shuffle_key(split, iterator): buckets = defaultdict(list) for (k, v) in iterator: - buckets[partitionFunc(k) % numSplits].append((k, v)) + buckets[partitionFunc(k) % numPartitions].append((k, v)) for (split, items) in buckets.iteritems(): yield str(split) yield dump_pickle(Batch(items)) keyed = PipelinedRDD(self, add_shuffle_key) keyed._bypass_serializer = True pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() - partitioner = self.ctx._jvm.PythonPartitioner(numSplits, + partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, id(partitionFunc)) jrdd = pairRDD.partitionBy(partitioner).values() rdd = RDD(jrdd, self.ctx) @@ -561,7 +561,7 @@ class RDD(object): # TODO: add control over map-side aggregation def combineByKey(self, createCombiner, mergeValue, mergeCombiners, - numSplits=None): + numPartitions=None): """ Generic function to combine the elements for each key using a custom set of aggregation functions. @@ -586,8 +586,8 @@ class RDD(object): >>> sorted(x.combineByKey(str, add, add).collect()) [('a', '11'), ('b', '1')] """ - if numSplits is None: - numSplits = self.ctx.defaultParallelism + if numPartitions is None: + numPartitions = self.ctx.defaultParallelism def combineLocally(iterator): combiners = {} for (k, v) in iterator: @@ -597,7 +597,7 @@ class RDD(object): combiners[k] = mergeValue(combiners[k], v) return combiners.iteritems() locally_combined = self.mapPartitions(combineLocally) - shuffled = locally_combined.partitionBy(numSplits) + shuffled = locally_combined.partitionBy(numPartitions) def _mergeCombiners(iterator): combiners = {} for (k, v) in iterator: @@ -609,10 +609,10 @@ class RDD(object): return shuffled.mapPartitions(_mergeCombiners) # TODO: support variant with custom partitioner - def groupByKey(self, numSplits=None): + def groupByKey(self, numPartitions=None): """ Group the values for each key in the RDD into a single sequence. - Hash-partitions the resulting RDD with into numSplits partitions. + Hash-partitions the resulting RDD with into numPartitions partitions. >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(x.groupByKey().collect()) @@ -630,7 +630,7 @@ class RDD(object): return a + b return self.combineByKey(createCombiner, mergeValue, mergeCombiners, - numSplits) + numPartitions) # TODO: add tests def flatMapValues(self, f): @@ -659,7 +659,7 @@ class RDD(object): return self.cogroup(other) # TODO: add variant with custom parittioner - def cogroup(self, other, numSplits=None): + def cogroup(self, other, numPartitions=None): """ For each key k in C{self} or C{other}, return a resulting RDD that contains a tuple with the list of values for that key in C{self} as well @@ -670,7 +670,7 @@ class RDD(object): >>> sorted(x.cogroup(y).collect()) [('a', ([1], [2])), ('b', ([4], []))] """ - return python_cogroup(self, other, numSplits) + return python_cogroup(self, other, numPartitions) # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the