diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index ebbbbd88061a1df7c205638f0872d7b9f724e107..9680c6f3e1475d7f1111a5461a5613aebbbc6177 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -134,13 +134,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ + def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType()) + } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue())) } /** - * Return a new RDD by applying a function to each partition of this RDD. + * Return a new RDD by applying a function to each partition of this RDD. */ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): JavaPairRDD[K2, V2] = { @@ -148,6 +156,31 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue())) + } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean): + JavaPairRDD[K2, V2] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType()) + } + + /** + * Applies a function f to each partition of this RDD. + */ + def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { + rdd.foreachPartition((x => f(asJavaIterator(x)))) + } + /** * Return an RDD created by coalescing all elements within each partition into an array. */ @@ -461,4 +494,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) def name(): String = rdd.name + + /** Reset generator */ + def setGenerator(_generator: String) = { + rdd.setGenerator(_generator) + } }