Skip to content
Snippets Groups Projects
Commit ec2dadb5 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #417 from JoshRosen/spark-668

Fix JavaRDDLike.flatMap(PairFlatMapFunction) (SPARK-668).
parents 2435b7b5 d49cf0e5
No related branches found
No related tags found
No related merge requests found
...@@ -12,7 +12,7 @@ import spark.storage.StorageLevel ...@@ -12,7 +12,7 @@ import spark.storage.StorageLevel
import com.google.common.base.Optional import com.google.common.base.Optional
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] {
def wrapRDD(rdd: RDD[T]): This def wrapRDD(rdd: RDD[T]): This
implicit val classManifest: ClassManifest[T] implicit val classManifest: ClassManifest[T]
...@@ -82,10 +82,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { ...@@ -82,10 +82,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
} }
/** /**
* Return a new RDD by first applying a function to all elements of this * Part of the workaround for SPARK-668; called in PairFlatMapWorkaround.java.
* RDD, and then flattening the results.
*/ */
def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = { private[spark] def doFlatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala def fn = (x: T) => f.apply(x).asScala
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
......
package spark.api.java;
import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDDLike;
import spark.api.java.function.PairFlatMapFunction;
import java.io.Serializable;
/**
* Workaround for SPARK-668.
*/
class PairFlatMapWorkaround<T> implements Serializable {
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
public <K, V> JavaPairRDD<K, V> flatMap(PairFlatMapFunction<T, K, V> f) {
return ((JavaRDDLike <T, ?>) this).doFlatMap(f);
}
}
...@@ -355,6 +355,34 @@ public class JavaAPISuite implements Serializable { ...@@ -355,6 +355,34 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(11, pairs.count()); Assert.assertEquals(11, pairs.count());
} }
@Test
public void mapsFromPairsToPairs() {
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
new Tuple2<Integer, String>(2, "aa"),
new Tuple2<Integer, String>(3, "aaa")
);
JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
// Regression test for SPARK-668:
JavaPairRDD<String, Integer> swapped = pairRDD.flatMap(
new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
return Collections.singletonList(item.swap());
}
});
swapped.collect();
// There was never a bug here, but it's worth testing:
pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
return item.swap();
}
}).collect();
}
@Test @Test
public void mapPartitions() { public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment