From 8edc9d0330c94b50e01956ae88693cff4e0977b2 Mon Sep 17 00:00:00 2001
From: Erik Selin <erik.selin@jadedpixel.com>
Date: Tue, 3 Jun 2014 13:31:16 -0700
Subject: [PATCH] [SPARK-1468] Modify the partition function used by
 partitionBy.

Make partitionBy use a tweaked version of hash as its default partition function
since the python hash function does not consistently assign the same value
to None across python processes.

Associated JIRA at https://issues.apache.org/jira/browse/SPARK-1468

Author: Erik Selin <erik.selin@jadedpixel.com>

Closes #371 from tyro89/consistent_hashing and squashes the following commits:

201c301 [Erik Selin] Make partitionBy use a tweaked version of hash as its default partition function since the python hash function does not consistently assign the same value to None across python processes.
---
 python/pyspark/rdd.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index f3b1f1a665..1b3c460dd6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1062,7 +1062,7 @@ class RDD(object):
         return python_right_outer_join(self, other, numPartitions)
 
     # TODO: add option to control map-side combining
-    def partitionBy(self, numPartitions, partitionFunc=hash):
+    def partitionBy(self, numPartitions, partitionFunc=None):
         """
         Return a copy of the RDD partitioned using the specified partitioner.
 
@@ -1073,6 +1073,9 @@ class RDD(object):
         """
         if numPartitions is None:
             numPartitions = self.ctx.defaultParallelism
+
+        if partitionFunc is None:
+            partitionFunc = lambda x: 0 if x is None else hash(x)
         # Transferring O(n) objects to Java is too expensive.  Instead, we'll
         # form the hash buckets in Python, transferring O(numPartitions) objects
         # to Java.  Each object is a (splitNumber, [objects]) pair.
-- 
GitLab