diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md index c1ef46a1cded743d59f7d1e333d3936e001bd3df..7c0f67bc99e8363d6a44e797354fba7f6227578c 100644 --- a/docs/scala-programming-guide.md +++ b/docs/scala-programming-guide.md @@ -168,9 +168,9 @@ The following tables list the transformations and actions currently supported (s Iterator[T] => Iterator[U] when running on an RDD of type T. </td> </tr> <tr> - <td> <b>mapPartitionsWithSplit</b>(<i>func</i>) </td> + <td> <b>mapPartitionsWithIndex</b>(<i>func</i>) </td> <td> Similar to mapPartitions, but also provides <i>func</i> with an integer value representing the index of - the split, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T. + the partition, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T. </td> </tr> <tr> diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6fb4a7b3be25dfaeea2a7bdca0512868ade70a58..1ad4b5298758ba50816fe3946423e87c3d1314c6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -27,6 +27,7 @@ import traceback from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread +import warnings from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, pack_long @@ -179,7 +180,7 @@ class RDD(object): [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] """ def func(s, iterator): return chain.from_iterable(imap(f, iterator)) - return self.mapPartitionsWithSplit(func, preservesPartitioning) + return self.mapPartitionsWithIndex(func, preservesPartitioning) def mapPartitions(self, f, preservesPartitioning=False): """ @@ -191,10 +192,24 @@ class RDD(object): [3, 7] """ def func(s, iterator): return f(iterator) - return self.mapPartitionsWithSplit(func) + return self.mapPartitionsWithIndex(func) + + def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + """ + Return a new RDD by applying a function to each partition of this RDD, + while tracking the index of the original partition. + + >>> rdd = sc.parallelize([1, 2, 3, 4], 4) + >>> def f(splitIndex, iterator): yield splitIndex + >>> rdd.mapPartitionsWithIndex(f).sum() + 6 + """ + return PipelinedRDD(self, f, preservesPartitioning) def mapPartitionsWithSplit(self, f, preservesPartitioning=False): """ + Deprecated: use mapPartitionsWithIndex instead. + Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. @@ -203,7 +218,9 @@ class RDD(object): >>> rdd.mapPartitionsWithSplit(f).sum() 6 """ - return PipelinedRDD(self, f, preservesPartitioning) + warnings.warn("mapPartitionsWithSplit is deprecated; " + "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) + return self.mapPartitionsWithIndex(f, preservesPartitioning) def filter(self, f): """ @@ -235,7 +252,7 @@ class RDD(object): >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] """ - return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True) + return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed):