Skip to content
Snippets Groups Projects
Commit 60e74572 authored by eklavya's avatar eklavya
Browse files

fixed ClassTag in mapPartitions

parent 1442cd5d
No related branches found
No related tags found
No related merge requests found
...@@ -134,13 +134,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { ...@@ -134,13 +134,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/** /**
* Return a new RDD by applying a function to each partition of this RDD. * Return a new RDD by applying a function to each partition of this RDD.
*/ */
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
}
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue())) new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
} }
/** /**
* Return a new RDD by applying a function to each partition of this RDD. * Return a new RDD by applying a function to each partition of this RDD.
*/ */
def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = { JavaPairRDD[K2, V2] = {
...@@ -148,13 +156,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { ...@@ -148,13 +156,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
} }
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U](
f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = {
rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning)
}
/** /**
* Return a new RDD by applying a function to each partition of this RDD. * Return a new RDD by applying a function to each partition of this RDD.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment