Skip to content
Snippets Groups Projects
Commit feba7ee5 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

SPARK-815. Python parallelize() should split lists before batching

One unfortunate consequence of this fix is that we materialize any
collections that are given to us as generators, but this seems necessary
to get reasonable behavior on small collections. We could add a
batchSize parameter later to bypass auto-computation of batch size if
this becomes a problem (e.g. if users really want to parallelize big
generators nicely)
parent d75c3086
No related branches found
No related tags found
No related merge requests found
...@@ -141,14 +141,21 @@ class SparkContext(object): ...@@ -141,14 +141,21 @@ class SparkContext(object):
def parallelize(self, c, numSlices=None): def parallelize(self, c, numSlices=None):
""" """
Distribute a local Python collection to form an RDD. Distribute a local Python collection to form an RDD.
>>> sc.parallelize(range(5), 5).glom().collect()
[[0], [1], [2], [3], [4]]
""" """
numSlices = numSlices or self.defaultParallelism numSlices = numSlices or self.defaultParallelism
# Calling the Java parallelize() method with an ArrayList is too slow, # Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized # because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile(). # objects are written to a file and loaded through textFile().
tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir) tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
if self.batchSize != 1: # Make sure we distribute data evenly if it's smaller than self.batchSize
c = batched(c, self.batchSize) if "__len__" not in dir(c):
c = list(c) # Make it a list so we can compute its length
batchSize = min(len(c) // numSlices, self.batchSize)
if batchSize > 1:
c = batched(c, batchSize)
for x in c: for x in c:
write_with_length(dump_pickle(x), tempFile) write_with_length(dump_pickle(x), tempFile)
tempFile.close() tempFile.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment