Skip to content
Snippets Groups Projects
Commit b8afe305 authored by Prashant Sharma's avatar Prashant Sharma Committed by Matei Zaharia
Browse files

SPARK-1162 Added top in python.

Author: Prashant Sharma <prashant.s@imaginea.com>

Closes #93 from ScrapCodes/SPARK-1162/pyspark-top-takeOrdered and squashes the following commits:

ece1fa4 [Prashant Sharma] Added top in python.
parent 5d1ec64e
No related branches found
No related tags found
No related merge requests found
......@@ -29,6 +29,7 @@ from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
import warnings
from heapq import heappush, heappop, heappushpop
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
......@@ -660,6 +661,30 @@ class RDD(object):
m1[k] += v
return m1
return self.mapPartitions(countPartition).reduce(mergeMaps)
def top(self, num):
"""
Get the top N elements from a RDD.
Note: It returns the list sorted in ascending order.
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
[12]
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().top(2)
[5, 6]
"""
def topIterator(iterator):
q = []
for k in iterator:
if len(q) < num:
heappush(q, k)
else:
heappushpop(q, k)
yield q
def merge(a, b):
return next(topIterator(a + b))
return sorted(self.mapPartitions(topIterator).reduce(merge))
def take(self, num):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment