From 649e9d0f5b2d5fc13f2dd5be675331510525927f Mon Sep 17 00:00:00 2001 From: Sean Owen <sowen@cloudera.com> Date: Tue, 26 Jan 2016 11:55:28 +0000 Subject: [PATCH] [SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable. CC rxin pwendell for API change; tdas since it also touches streaming. Author: Sean Owen <sowen@cloudera.com> Closes #10413 from srowen/SPARK-3369. --- .../api/java/function/CoGroupFunction.java | 2 +- .../java/function/DoubleFlatMapFunction.java | 3 +- .../api/java/function/FlatMapFunction.java | 3 +- .../api/java/function/FlatMapFunction2.java | 3 +- .../java/function/FlatMapGroupsFunction.java | 2 +- .../java/function/MapPartitionsFunction.java | 2 +- .../java/function/PairFlatMapFunction.java | 3 +- .../apache/spark/api/java/JavaRDDLike.scala | 20 ++++++------ .../java/org/apache/spark/JavaAPISuite.java | 24 +++++++------- docs/streaming-programming-guide.md | 4 +-- .../apache/spark/examples/JavaPageRank.java | 18 +++++------ .../apache/spark/examples/JavaWordCount.java | 5 +-- .../streaming/JavaActorWordCount.java | 5 +-- .../streaming/JavaCustomReceiver.java | 7 +++-- .../streaming/JavaDirectKafkaWordCount.java | 6 ++-- .../streaming/JavaKafkaWordCount.java | 9 +++--- .../streaming/JavaNetworkWordCount.java | 11 ++++--- .../JavaRecoverableNetworkWordCount.java | 6 ++-- .../streaming/JavaSqlNetworkWordCount.java | 8 ++--- .../JavaStatefulNetworkWordCount.java | 5 +-- .../JavaTwitterHashTagJoinSentiments.java | 5 +-- .../java/org/apache/spark/Java8APISuite.java | 2 +- .../streaming/JavaKinesisWordCountASL.java | 9 ++++-- project/MimaExcludes.scala | 31 +++++++++++++++++++ .../scala/org/apache/spark/sql/Dataset.scala | 4 +-- .../apache/spark/sql/JavaDatasetSuite.java | 25 +++++++-------- .../streaming/api/java/JavaDStreamLike.scala | 10 +++--- .../spark/streaming/dstream/DStream.scala | 2 +- .../streaming/dstream/FlatMappedDStream.scala | 2 +- .../apache/spark/streaming/JavaAPISuite.java | 20 ++++++------ 30 files changed, 146 insertions(+), 110 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java index 279639af5d..07aebb75e8 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java @@ -25,5 +25,5 @@ import java.util.Iterator; * Datasets. */ public interface CoGroupFunction<K, V1, V2, R> extends Serializable { - Iterable<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception; + Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java index 57fd0a7a80..576087b6f4 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java @@ -18,10 +18,11 @@ package org.apache.spark.api.java.function; import java.io.Serializable; +import java.util.Iterator; /** * A function that returns zero or more records of type Double from each input record. */ public interface DoubleFlatMapFunction<T> extends Serializable { - public Iterable<Double> call(T t) throws Exception; + Iterator<Double> call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java index ef0d182412..2d8ea6d1a5 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java @@ -18,10 +18,11 @@ package org.apache.spark.api.java.function; import java.io.Serializable; +import java.util.Iterator; /** * A function that returns zero or more output records from each input record. */ public interface FlatMapFunction<T, R> extends Serializable { - Iterable<R> call(T t) throws Exception; + Iterator<R> call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java index 14a98a38ef..fc97b63f82 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java @@ -18,10 +18,11 @@ package org.apache.spark.api.java.function; import java.io.Serializable; +import java.util.Iterator; /** * A function that takes two inputs and returns zero or more output records. */ public interface FlatMapFunction2<T1, T2, R> extends Serializable { - Iterable<R> call(T1 t1, T2 t2) throws Exception; + Iterator<R> call(T1 t1, T2 t2) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java index d7a80e7b12..bae574ab57 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java @@ -24,5 +24,5 @@ import java.util.Iterator; * A function that returns zero or more output records from each grouping key and its values. */ public interface FlatMapGroupsFunction<K, V, R> extends Serializable { - Iterable<R> call(K key, Iterator<V> values) throws Exception; + Iterator<R> call(K key, Iterator<V> values) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java index 6cb569ce0c..cf9945a215 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java @@ -24,5 +24,5 @@ import java.util.Iterator; * Base interface for function used in Dataset's mapPartitions. */ public interface MapPartitionsFunction<T, U> extends Serializable { - Iterable<U> call(Iterator<T> input) throws Exception; + Iterator<U> call(Iterator<T> input) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java index 691ef2eceb..51eed2e67b 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java @@ -18,6 +18,7 @@ package org.apache.spark.api.java.function; import java.io.Serializable; +import java.util.Iterator; import scala.Tuple2; @@ -26,5 +27,5 @@ import scala.Tuple2; * key-value pairs are represented as scala.Tuple2 objects. */ public interface PairFlatMapFunction<T, K, V> extends Serializable { - public Iterable<Tuple2<K, V>> call(T t) throws Exception; + Iterator<Tuple2<K, V>> call(T t) throws Exception; } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 0f8d13cf5c..7340defabf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -121,7 +121,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * RDD, and then flattening the results. */ def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = { - def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala + def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U]) } @@ -130,7 +130,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * RDD, and then flattening the results. */ def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { - def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala + def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue())) } @@ -139,7 +139,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * RDD, and then flattening the results. */ def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { - def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala + def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]] JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2]) } @@ -149,7 +149,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = { def fn: (Iterator[T]) => Iterator[U] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U]) } @@ -160,7 +160,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { def fn: (Iterator[T]) => Iterator[U] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } JavaRDD.fromRDD( rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U]) @@ -171,7 +171,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { def fn: (Iterator[T]) => Iterator[jl.Double] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue())) } @@ -182,7 +182,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): JavaPairRDD[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2]) } @@ -193,7 +193,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = { def fn: (Iterator[T]) => Iterator[jl.Double] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning) .map(x => x.doubleValue())) @@ -205,7 +205,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } JavaPairRDD.fromRDD( rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2]) @@ -290,7 +290,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { other: JavaRDDLike[U, _], f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = { def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { - (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).iterator().asScala + (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala } JavaRDD.fromRDD( rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V]) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 44d5cac7c2..8117ad9e60 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -880,8 +880,8 @@ public class JavaAPISuite implements Serializable { "The quick brown fox jumps over the lazy dog.")); JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String x) { - return Arrays.asList(x.split(" ")); + public Iterator<String> call(String x) { + return Arrays.asList(x.split(" ")).iterator(); } }); Assert.assertEquals("Hello", words.first()); @@ -890,12 +890,12 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair( new PairFlatMapFunction<String, String, String>() { @Override - public Iterable<Tuple2<String, String>> call(String s) { + public Iterator<Tuple2<String, String>> call(String s) { List<Tuple2<String, String>> pairs = new LinkedList<>(); for (String word : s.split(" ")) { pairs.add(new Tuple2<>(word, word)); } - return pairs; + return pairs.iterator(); } } ); @@ -904,12 +904,12 @@ public class JavaAPISuite implements Serializable { JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() { @Override - public Iterable<Double> call(String s) { + public Iterator<Double> call(String s) { List<Double> lengths = new LinkedList<>(); for (String word : s.split(" ")) { lengths.add((double) word.length()); } - return lengths; + return lengths.iterator(); } }); Assert.assertEquals(5.0, doubles.first(), 0.01); @@ -930,8 +930,8 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair( new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() { @Override - public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) { - return Collections.singletonList(item.swap()); + public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) { + return Collections.singletonList(item.swap()).iterator(); } }); swapped.collect(); @@ -951,12 +951,12 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> partitionSums = rdd.mapPartitions( new FlatMapFunction<Iterator<Integer>, Integer>() { @Override - public Iterable<Integer> call(Iterator<Integer> iter) { + public Iterator<Integer> call(Iterator<Integer> iter) { int sum = 0; while (iter.hasNext()) { sum += iter.next(); } - return Collections.singletonList(sum); + return Collections.singletonList(sum).iterator(); } }); Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); @@ -1367,8 +1367,8 @@ public class JavaAPISuite implements Serializable { FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() { @Override - public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) { - return Arrays.asList(Iterators.size(i), Iterators.size(s)); + public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) { + return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator(); } }; diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 93c34efb66..7e681b67cf 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -165,8 +165,8 @@ space into words. // Split each line into words JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>() { - @Override public Iterable<String> call(String x) { - return Arrays.asList(x.split(" ")); + @Override public Iterator<String> call(String x) { + return Arrays.asList(x.split(" ")).iterator(); } }); {% endhighlight %} diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index a5db8accdf..635fb6a373 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -17,7 +17,10 @@ package org.apache.spark.examples; - +import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; +import java.util.regex.Pattern; import scala.Tuple2; @@ -32,11 +35,6 @@ import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; -import java.util.ArrayList; -import java.util.List; -import java.util.Iterator; -import java.util.regex.Pattern; - /** * Computes the PageRank of URLs from an input file. Input file should * be in format of: @@ -108,13 +106,13 @@ public final class JavaPageRank { JavaPairRDD<String, Double> contribs = links.join(ranks).values() .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() { @Override - public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) { + public Iterator<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) { int urlCount = Iterables.size(s._1); - List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>(); + List<Tuple2<String, Double>> results = new ArrayList<>(); for (String n : s._1) { - results.add(new Tuple2<String, Double>(n, s._2() / urlCount)); + results.add(new Tuple2<>(n, s._2() / urlCount)); } - return results; + return results.iterator(); } }); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index 9a6a944f7e..d746a3d2b6 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -27,6 +27,7 @@ import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -46,8 +47,8 @@ public final class JavaWordCount { JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String s) { - return Arrays.asList(SPACE.split(s)); + public Iterator<String> call(String s) { + return Arrays.asList(SPACE.split(s)).iterator(); } }); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java index 62e563380a..cf774667f6 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java @@ -18,6 +18,7 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; +import java.util.Iterator; import scala.Tuple2; @@ -120,8 +121,8 @@ public class JavaActorWordCount { // compute wordcount lines.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String s) { - return Arrays.asList(s.split("\\s+")); + public Iterator<String> call(String s) { + return Arrays.asList(s.split("\\s+")).iterator(); } }).mapToPair(new PairFunction<String, String, Integer>() { @Override diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java index 4b50fbf59f..3d668adcf8 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -17,7 +17,6 @@ package org.apache.spark.examples.streaming; -import com.google.common.collect.Lists; import com.google.common.io.Closeables; import org.apache.spark.SparkConf; @@ -37,6 +36,8 @@ import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.ConnectException; import java.net.Socket; +import java.util.Arrays; +import java.util.Iterator; import java.util.regex.Pattern; /** @@ -74,8 +75,8 @@ public class JavaCustomReceiver extends Receiver<String> { new JavaCustomReceiver(args[0], Integer.parseInt(args[1]))); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String x) { - return Lists.newArrayList(SPACE.split(x)); + public Iterator<String> call(String x) { + return Arrays.asList(SPACE.split(x)).iterator(); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java index f9a5e7f69f..5107500a12 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -20,11 +20,11 @@ package org.apache.spark.examples.streaming; import java.util.HashMap; import java.util.HashSet; import java.util.Arrays; +import java.util.Iterator; import java.util.regex.Pattern; import scala.Tuple2; -import com.google.common.collect.Lists; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; @@ -87,8 +87,8 @@ public final class JavaDirectKafkaWordCount { }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String x) { - return Lists.newArrayList(SPACE.split(x)); + public Iterator<String> call(String x) { + return Arrays.asList(SPACE.split(x)).iterator(); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java index 337f8ffb5b..0df4cb40a9 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -17,20 +17,19 @@ package org.apache.spark.examples.streaming; +import java.util.Arrays; +import java.util.Iterator; import java.util.Map; import java.util.HashMap; import java.util.regex.Pattern; - import scala.Tuple2; -import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.examples.streaming.StreamingExamples; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -88,8 +87,8 @@ public final class JavaKafkaWordCount { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String x) { - return Lists.newArrayList(SPACE.split(x)); + public Iterator<String> call(String x) { + return Arrays.asList(SPACE.split(x)).iterator(); } }); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java index 3e9f0f4b8f..b82b319acb 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java @@ -17,8 +17,11 @@ package org.apache.spark.examples.streaming; +import java.util.Arrays; +import java.util.Iterator; +import java.util.regex.Pattern; + import scala.Tuple2; -import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; @@ -31,8 +34,6 @@ import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import java.util.regex.Pattern; - /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * @@ -67,8 +68,8 @@ public final class JavaNetworkWordCount { args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String x) { - return Lists.newArrayList(SPACE.split(x)); + public Iterator<String> call(String x) { + return Arrays.asList(SPACE.split(x)).iterator(); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index bc963a02be..bc8cbcdef7 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -21,11 +21,11 @@ import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; import scala.Tuple2; -import com.google.common.collect.Lists; import com.google.common.io.Files; import org.apache.spark.Accumulator; @@ -138,8 +138,8 @@ public final class JavaRecoverableNetworkWordCount { JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String x) { - return Lists.newArrayList(SPACE.split(x)); + public Iterator<String> call(String x) { + return Arrays.asList(SPACE.split(x)).iterator(); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java index 084f68a8be..f0228f5e63 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java @@ -17,10 +17,10 @@ package org.apache.spark.examples.streaming; +import java.util.Arrays; +import java.util.Iterator; import java.util.regex.Pattern; -import com.google.common.collect.Lists; - import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; @@ -72,8 +72,8 @@ public final class JavaSqlNetworkWordCount { args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String x) { - return Lists.newArrayList(SPACE.split(x)); + public Iterator<String> call(String x) { + return Arrays.asList(SPACE.split(x)).iterator(); } }); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index f52cc7c205..6beab90f08 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -18,6 +18,7 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -73,8 +74,8 @@ public class JavaStatefulNetworkWordCount { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String x) { - return Arrays.asList(SPACE.split(x)); + public Iterator<String> call(String x) { + return Arrays.asList(SPACE.split(x)).iterator(); } }); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java index d869768026..f0ae9a99ba 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java @@ -34,6 +34,7 @@ import scala.Tuple2; import twitter4j.Status; import java.util.Arrays; +import java.util.Iterator; import java.util.List; /** @@ -70,8 +71,8 @@ public class JavaTwitterHashTagJoinSentiments { JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() { @Override - public Iterable<String> call(Status s) { - return Arrays.asList(s.getText().split(" ")); + public Iterator<String> call(Status s) { + return Arrays.asList(s.getText().split(" ")).iterator(); } }); diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index 27d494ce35..c0b58e713f 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -294,7 +294,7 @@ public class Java8APISuite implements Serializable { sizeS += 1; s.next(); } - return Arrays.asList(sizeI, sizeS); + return Arrays.asList(sizeI, sizeS).iterator(); }; JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index 06e0ff28af..64e044aa8e 100644 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -16,7 +16,10 @@ */ package org.apache.spark.examples.streaming; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -38,7 +41,6 @@ import scala.Tuple2; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.google.common.collect.Lists; /** * Consumes messages from a Amazon Kinesis streams and does wordcount. @@ -154,8 +156,9 @@ public final class JavaKinesisWordCountASL { // needs to be public for access fr // Convert each line of Array[Byte] to String, and split into words JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() { @Override - public Iterable<String> call(byte[] line) { - return Lists.newArrayList(WORD_SEPARATOR.split(new String(line))); + public Iterator<String> call(byte[] line) { + String s = new String(line, StandardCharsets.UTF_8); + return Arrays.asList(WORD_SEPARATOR.split(s)).iterator(); } }); diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 501456b043..643bee6969 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -60,6 +60,37 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory") ) ++ + Seq( + // SPARK-3369 Fix Iterable/Iterator in Java API + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.function.FlatMapFunction.call"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.function.FlatMapFunction.call"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.function.DoubleFlatMapFunction.call"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.function.DoubleFlatMapFunction.call"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.function.FlatMapFunction2.call"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.function.FlatMapFunction2.call"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.function.PairFlatMapFunction.call"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.function.PairFlatMapFunction.call"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.function.CoGroupFunction.call"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.function.CoGroupFunction.call"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.function.MapPartitionsFunction.call"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.function.MapPartitionsFunction.call"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.api.java.function.FlatMapGroupsFunction.call"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.function.FlatMapGroupsFunction.call") + ) ++ Seq( // SPARK-4819 replace Guava Optional ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getCheckpointDir"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index bd99c39957..f182270a08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -346,7 +346,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = { - val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).iterator.asScala + val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).asScala mapPartitions(func)(encoder) } @@ -366,7 +366,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = { - val func: (T) => Iterable[U] = x => f.call(x).asScala + val func: (T) => Iterator[U] = x => f.call(x).asScala flatMap(func)(encoder) } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 3c0f25a5dc..a6fb62c17d 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -111,24 +111,24 @@ public class JavaDatasetSuite implements Serializable { Dataset<String> parMapped = ds.mapPartitions(new MapPartitionsFunction<String, String>() { @Override - public Iterable<String> call(Iterator<String> it) throws Exception { - List<String> ls = new LinkedList<String>(); + public Iterator<String> call(Iterator<String> it) { + List<String> ls = new LinkedList<>(); while (it.hasNext()) { - ls.add(it.next().toUpperCase()); + ls.add(it.next().toUpperCase(Locale.ENGLISH)); } - return ls; + return ls.iterator(); } }, Encoders.STRING()); Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), parMapped.collectAsList()); Dataset<String> flatMapped = ds.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String s) throws Exception { - List<String> ls = new LinkedList<String>(); + public Iterator<String> call(String s) { + List<String> ls = new LinkedList<>(); for (char c : s.toCharArray()) { ls.add(String.valueOf(c)); } - return ls; + return ls.iterator(); } }, Encoders.STRING()); Assert.assertEquals( @@ -192,12 +192,12 @@ public class JavaDatasetSuite implements Serializable { Dataset<String> flatMapped = grouped.flatMapGroups( new FlatMapGroupsFunction<Integer, String, String>() { @Override - public Iterable<String> call(Integer key, Iterator<String> values) throws Exception { + public Iterator<String> call(Integer key, Iterator<String> values) { StringBuilder sb = new StringBuilder(key.toString()); while (values.hasNext()) { sb.append(values.next()); } - return Collections.singletonList(sb.toString()); + return Collections.singletonList(sb.toString()).iterator(); } }, Encoders.STRING()); @@ -228,10 +228,7 @@ public class JavaDatasetSuite implements Serializable { grouped2, new CoGroupFunction<Integer, String, Integer, String>() { @Override - public Iterable<String> call( - Integer key, - Iterator<String> left, - Iterator<Integer> right) throws Exception { + public Iterator<String> call(Integer key, Iterator<String> left, Iterator<Integer> right) { StringBuilder sb = new StringBuilder(key.toString()); while (left.hasNext()) { sb.append(left.next()); @@ -240,7 +237,7 @@ public class JavaDatasetSuite implements Serializable { while (right.hasNext()) { sb.append(right.next()); } - return Collections.singletonList(sb.toString()); + return Collections.singletonList(sb.toString()).iterator(); } }, Encoders.STRING()); diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index a791a474c6..f10de485d0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -166,8 +166,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * and then flattening the results */ def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = { - import scala.collection.JavaConverters._ - def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala + def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U]) } @@ -176,8 +175,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * and then flattening the results */ def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { - import scala.collection.JavaConverters._ - def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala + def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala def cm: ClassTag[(K2, V2)] = fakeClassTag new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2]) } @@ -189,7 +187,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T */ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { def fn: (Iterator[T]) => Iterator[U] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U]) } @@ -202,7 +200,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]) : JavaPairDStream[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2]) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 1dfb4e7abc..db79eeab9c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -550,7 +550,7 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ - def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope { + def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index 96a444a7ba..d60a617978 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -25,7 +25,7 @@ import org.apache.spark.streaming.{Duration, Time} private[streaming] class FlatMappedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], - flatMapFunc: T => Traversable[U] + flatMapFunc: T => TraversableOnce[U] ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 4dbcef2934..806cea24ca 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -271,12 +271,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> mapped = stream.mapPartitions( new FlatMapFunction<Iterator<String>, String>() { @Override - public Iterable<String> call(Iterator<String> in) { + public Iterator<String> call(Iterator<String> in) { StringBuilder out = new StringBuilder(); while (in.hasNext()) { out.append(in.next().toUpperCase(Locale.ENGLISH)); } - return Arrays.asList(out.toString()); + return Arrays.asList(out.toString()).iterator(); } }); JavaTestUtils.attachTestOutputStream(mapped); @@ -759,8 +759,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String x) { - return Arrays.asList(x.split("(?!^)")); + public Iterator<String> call(String x) { + return Arrays.asList(x.split("(?!^)")).iterator(); } }); JavaTestUtils.attachTestOutputStream(flatMapped); @@ -846,12 +846,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair( new PairFlatMapFunction<String, Integer, String>() { @Override - public Iterable<Tuple2<Integer, String>> call(String in) { + public Iterator<Tuple2<Integer, String>> call(String in) { List<Tuple2<Integer, String>> out = new ArrayList<>(); for (String letter: in.split("(?!^)")) { out.add(new Tuple2<>(in.length(), letter)); } - return out; + return out.iterator(); } }); JavaTestUtils.attachTestOutputStream(flatMapped); @@ -1019,13 +1019,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair( new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() { @Override - public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) { + public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) { List<Tuple2<Integer, String>> out = new LinkedList<>(); while (in.hasNext()) { Tuple2<String, Integer> next = in.next(); out.add(next.swap()); } - return out; + return out.iterator(); } }); @@ -1089,12 +1089,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair( new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { @Override - public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) { + public Iterator<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) { List<Tuple2<Integer, String>> out = new LinkedList<>(); for (Character s : in._1().toCharArray()) { out.add(new Tuple2<>(in._2(), s.toString())); } - return out; + return out.iterator(); } }); JavaTestUtils.attachTestOutputStream(flatMapped); -- GitLab