Skip to content
Snippets Groups Projects
Commit f18fd05b authored by Kan Zhang's avatar Kan Zhang Committed by Reynold Xin
Browse files

[SPARK-1519] Support minPartitions param of wholeTextFiles() in PySpark

Author: Kan Zhang <kzhang@apache.org>

Closes #697 from kanzhang/SPARK-1519 and squashes the following commits:

4f8d1ed [Kan Zhang] [SPARK-1519] Support minPartitions param of wholeTextFiles() in PySpark
parent ba5d4a99
No related branches found
No related tags found
No related merge requests found
...@@ -211,6 +211,13 @@ class SparkContext(object): ...@@ -211,6 +211,13 @@ class SparkContext(object):
""" """
return self._jsc.sc().defaultParallelism() return self._jsc.sc().defaultParallelism()
@property
def defaultMinPartitions(self):
"""
Default min number of partitions for Hadoop RDDs when not given by user
"""
return self._jsc.sc().defaultMinPartitions()
def __del__(self): def __del__(self):
self.stop() self.stop()
...@@ -264,7 +271,7 @@ class SparkContext(object): ...@@ -264,7 +271,7 @@ class SparkContext(object):
return RDD(self._jsc.textFile(name, minPartitions), self, return RDD(self._jsc.textFile(name, minPartitions), self,
UTF8Deserializer()) UTF8Deserializer())
def wholeTextFiles(self, path): def wholeTextFiles(self, path, minPartitions=None):
""" """
Read a directory of text files from HDFS, a local file system Read a directory of text files from HDFS, a local file system
(available on all nodes), or any Hadoop-supported file system (available on all nodes), or any Hadoop-supported file system
...@@ -300,7 +307,8 @@ class SparkContext(object): ...@@ -300,7 +307,8 @@ class SparkContext(object):
>>> sorted(textFiles.collect()) >>> sorted(textFiles.collect())
[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')] [(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]
""" """
return RDD(self._jsc.wholeTextFiles(path), self, minPartitions = minPartitions or self.defaultMinPartitions
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
def _checkpointFile(self, name, input_deserializer): def _checkpointFile(self, name, input_deserializer):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment