From d341b17c2a0a4fce04045e13fb4a3b0621296320 Mon Sep 17 00:00:00 2001
From: Sean Owen <sowen@cloudera.com>
Date: Wed, 4 Jun 2014 11:27:08 -0700
Subject: [PATCH] SPARK-1973. Add randomSplit to JavaRDD (with tests, and tidy
 Java tests)

I'd like to use randomSplit through the Java API, and would like to add a convenience wrapper for this method to JavaRDD. This is fairly trivial. (In fact, is the intent that JavaRDD not wrap every RDD method? and that sometimes users should just use JavaRDD.wrapRDD()?)

Along the way, I added tests for it, and also touched up the Java API test style and behavior. This is maybe the more useful part of this small change.

Author: Sean Owen <sowen@cloudera.com>
Author: Xiangrui Meng <meng@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Xiangrui Meng <meng@databricks.com>

Closes #919 from srowen/SPARK-1973 and squashes the following commits:

148cb7b [Sean Owen] Some final Java test polish, while we are at it
1fc3f3e [Xiangrui Meng] more cleaning on Java 8 tests
9ebc57f [Sean Owen] Use accumulator instead of temp files to test foreach
5efb0be [Sean Owen] Add Java randomSplit, and unit tests (including for sample)
5dcc158 [Sean Owen] Simplified Java 8 test with new language features, and fixed the name of MLB's greatest team
91a1769 [Sean Owen] Touch up minor style issues in existing Java API suite test
---
 .../org/apache/spark/api/java/JavaRDD.scala   |  22 +
 .../java/org/apache/spark/JavaAPISuite.java   | 193 ++++-----
 .../java/org/apache/spark/Java8APISuite.java  |  96 +++--
 .../apache/spark/streaming/Java8APISuite.java | 381 +++++++++---------
 4 files changed, 358 insertions(+), 334 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index dc698dea75..23d1371079 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -108,6 +108,28 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
   def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
     wrapRDD(rdd.sample(withReplacement, fraction, seed))
 
+
+  /**
+   * Randomly splits this RDD with the provided weights.
+   *
+   * @param weights weights for splits, will be normalized if they don't sum to 1
+   *
+   * @return split RDDs in an array
+   */
+  def randomSplit(weights: Array[Double]): Array[JavaRDD[T]] =
+    randomSplit(weights, Utils.random.nextLong)
+
+  /**
+   * Randomly splits this RDD with the provided weights.
+   *
+   * @param weights weights for splits, will be normalized if they don't sum to 1
+   * @param seed random seed
+   *
+   * @return split RDDs in an array
+   */
+  def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]] =
+    rdd.randomSplit(weights, seed).map(wrapRDD)
+
   /**
    * Return the union of this RDD and another one. Any identical elements will appear multiple
    * times (use `.distinct()` to eliminate them).
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index b78309f81c..50a6212911 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -23,6 +23,7 @@ import java.util.*;
 import scala.Tuple2;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.base.Optional;
 import com.google.common.base.Charsets;
@@ -48,7 +49,6 @@ import org.apache.spark.partial.BoundedDouble;
 import org.apache.spark.partial.PartialResult;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.util.StatCounter;
-import org.apache.spark.util.Utils;
 
 // The test suite itself is Serializable so that anonymous Function implementations can be
 // serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -70,16 +70,6 @@ public class JavaAPISuite implements Serializable {
     sc = null;
   }
 
-  static class ReverseIntComparator implements Comparator<Integer>, Serializable {
-
-    @Override
-    public int compare(Integer a, Integer b) {
-      if (a > b) return -1;
-      else if (a < b) return 1;
-      else return 0;
-    }
-  }
-
   @SuppressWarnings("unchecked")
   @Test
   public void sparkContextUnion() {
@@ -124,7 +114,7 @@ public class JavaAPISuite implements Serializable {
     JavaRDD<Integer> intersections = s1.intersection(s2);
     Assert.assertEquals(3, intersections.count());
 
-    ArrayList<Integer> list = new ArrayList<Integer>();
+    List<Integer> list = new ArrayList<Integer>();
     JavaRDD<Integer> empty = sc.parallelize(list);
     JavaRDD<Integer> emptyIntersection = empty.intersection(s2);
     Assert.assertEquals(0, emptyIntersection.count());
@@ -144,6 +134,28 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(2, pIntersection.count());
   }
 
+  @Test
+  public void sample() {
+    List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    JavaRDD<Integer> rdd = sc.parallelize(ints);
+    JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 11);
+    // expected 2 but of course result varies randomly a bit
+    Assert.assertEquals(3, sample20.count());
+    JavaRDD<Integer> sample20NoReplacement = rdd.sample(false, 0.2, 11);
+    Assert.assertEquals(2, sample20NoReplacement.count());
+  }
+
+  @Test
+  public void randomSplit() {
+    List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    JavaRDD<Integer> rdd = sc.parallelize(ints);
+    JavaRDD<Integer>[] splits = rdd.randomSplit(new double[] { 0.4, 0.6, 1.0 }, 11);
+    Assert.assertEquals(3, splits.length);
+    Assert.assertEquals(2, splits[0].count());
+    Assert.assertEquals(3, splits[1].count());
+    Assert.assertEquals(5, splits[2].count());
+  }
+
   @Test
   public void sortByKey() {
     List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
@@ -161,26 +173,24 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
 
     // Custom comparator
-    sortedRDD = rdd.sortByKey(new ReverseIntComparator(), false);
+    sortedRDD = rdd.sortByKey(Collections.<Integer>reverseOrder(), false);
     Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
     sortedPairs = sortedRDD.collect();
     Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
     Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
   }
 
-  static int foreachCalls = 0;
-
   @Test
   public void foreach() {
-    foreachCalls = 0;
+    final Accumulator<Integer> accum = sc.accumulator(0);
     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
     rdd.foreach(new VoidFunction<String>() {
       @Override
-      public void call(String s) {
-        foreachCalls++;
+      public void call(String s) throws IOException {
+        accum.add(1);
       }
     });
-    Assert.assertEquals(2, foreachCalls);
+    Assert.assertEquals(2, accum.value().intValue());
   }
 
   @Test
@@ -188,7 +198,7 @@ public class JavaAPISuite implements Serializable {
     List<Integer> correct = Arrays.asList(1, 2, 3, 4);
     JavaRDD<Integer> rdd = sc.parallelize(correct);
     List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
-    Assert.assertTrue(correct.equals(result));
+    Assert.assertEquals(correct, result);
   }
 
   @Test
@@ -196,7 +206,7 @@ public class JavaAPISuite implements Serializable {
     List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
     JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId();
     JavaRDD<Long> indexes = zip.values();
-    Assert.assertTrue(new HashSet<Long>(indexes.collect()).size() == 4);
+    Assert.assertEquals(4, new HashSet<Long>(indexes.collect()).size());
   }
 
   @Test
@@ -205,7 +215,7 @@ public class JavaAPISuite implements Serializable {
     JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex();
     JavaRDD<Long> indexes = zip.values();
     List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L);
-    Assert.assertTrue(indexes.collect().equals(correctIndexes));
+    Assert.assertEquals(correctIndexes, indexes.collect());
   }
 
   @SuppressWarnings("unchecked")
@@ -252,8 +262,10 @@ public class JavaAPISuite implements Serializable {
       new Tuple2<String, Integer>("Oranges", 2),
       new Tuple2<String, Integer>("Apples", 3)
     ));
-    JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = categories.cogroup(prices);
-    Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+    JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped =
+        categories.cogroup(prices);
+    Assert.assertEquals("[Fruit, Citrus]",
+                        Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
     Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
 
     cogrouped.collect();
@@ -281,8 +293,7 @@ public class JavaAPISuite implements Serializable {
       rdd1.leftOuterJoin(rdd2).filter(
         new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
           @Override
-          public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup)
-            throws Exception {
+          public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup) {
             return !tup._2()._2().isPresent();
           }
       }).first();
@@ -356,8 +367,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(2, localCounts.get(2).intValue());
     Assert.assertEquals(3, localCounts.get(3).intValue());
 
-   localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer,
-      Integer>() {
+    localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer, Integer>() {
       @Override
       public Integer call(Integer a, Integer b) {
         return a + b;
@@ -448,16 +458,17 @@ public class JavaAPISuite implements Serializable {
     JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
       @Override
       public double call(Integer x) {
-        return 1.0 * x;
+        return x.doubleValue();
       }
     }).cache();
     doubles.collect();
-    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(new PairFunction<Integer, Integer, Integer>() {
-      @Override
-      public Tuple2<Integer, Integer> call(Integer x) {
-        return new Tuple2<Integer, Integer>(x, x);
-      }
-    }).cache();
+    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(
+        new PairFunction<Integer, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Integer x) {
+            return new Tuple2<Integer, Integer>(x, x);
+          }
+        }).cache();
     pairs.collect();
     JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
       @Override
@@ -487,7 +498,9 @@ public class JavaAPISuite implements Serializable {
         @Override
         public Iterable<Tuple2<String, String>> call(String s) {
           List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>();
-          for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word));
+          for (String word : s.split(" ")) {
+            pairs.add(new Tuple2<String, String>(word, word));
+          }
           return pairs;
         }
       }
@@ -499,7 +512,9 @@ public class JavaAPISuite implements Serializable {
       @Override
       public Iterable<Double> call(String s) {
         List<Double> lengths = new LinkedList<Double>();
-        for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
+        for (String word : s.split(" ")) {
+          lengths.add((double) word.length());
+        }
         return lengths;
       }
     });
@@ -521,7 +536,7 @@ public class JavaAPISuite implements Serializable {
       JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
           new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
           @Override
-          public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
+          public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
               return Collections.singletonList(item.swap());
           }
       });
@@ -530,7 +545,7 @@ public class JavaAPISuite implements Serializable {
       // There was never a bug here, but it's worth testing:
       pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
           @Override
-          public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
+          public Tuple2<String, Integer> call(Tuple2<Integer, String> item) {
               return item.swap();
           }
       }).collect();
@@ -631,14 +646,10 @@ public class JavaAPISuite implements Serializable {
     byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8");
 
     String tempDirName = tempDir.getAbsolutePath();
-    DataOutputStream ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00000"));
-    ds.write(content1);
-    ds.close();
-    ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00001"));
-    ds.write(content2);
-    ds.close();
-
-    HashMap<String, String> container = new HashMap<String, String>();
+    Files.write(content1, new File(tempDirName + "/part-00000"));
+    Files.write(content2, new File(tempDirName + "/part-00001"));
+
+    Map<String, String> container = new HashMap<String, String>();
     container.put(tempDirName+"/part-00000", new Text(content1).toString());
     container.put(tempDirName+"/part-00001", new Text(content2).toString());
 
@@ -844,7 +855,7 @@ public class JavaAPISuite implements Serializable {
     JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
       @Override
       public double call(Integer x) {
-        return 1.0 * x;
+        return x.doubleValue();
       }
     });
     JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
@@ -859,17 +870,7 @@ public class JavaAPISuite implements Serializable {
       new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
         @Override
         public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
-          int sizeI = 0;
-          int sizeS = 0;
-          while (i.hasNext()) {
-            sizeI += 1;
-            i.next();
-          }
-          while (s.hasNext()) {
-            sizeS += 1;
-            s.next();
-          }
-          return Arrays.asList(sizeI, sizeS);
+          return Arrays.asList(Iterators.size(i), Iterators.size(s));
         }
       };
 
@@ -883,6 +884,7 @@ public class JavaAPISuite implements Serializable {
 
     final Accumulator<Integer> intAccum = sc.intAccumulator(10);
     rdd.foreach(new VoidFunction<Integer>() {
+      @Override
       public void call(Integer x) {
         intAccum.add(x);
       }
@@ -891,6 +893,7 @@ public class JavaAPISuite implements Serializable {
 
     final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
     rdd.foreach(new VoidFunction<Integer>() {
+      @Override
       public void call(Integer x) {
         doubleAccum.add((double) x);
       }
@@ -899,14 +902,17 @@ public class JavaAPISuite implements Serializable {
 
     // Try a custom accumulator type
     AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+      @Override
       public Float addInPlace(Float r, Float t) {
         return r + t;
       }
 
+      @Override
       public Float addAccumulator(Float r, Float t) {
         return r + t;
       }
 
+      @Override
       public Float zero(Float initialValue) {
         return 0.0f;
       }
@@ -914,6 +920,7 @@ public class JavaAPISuite implements Serializable {
 
     final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
     rdd.foreach(new VoidFunction<Integer>() {
+      @Override
       public void call(Integer x) {
         floatAccum.add((float) x);
       }
@@ -929,7 +936,8 @@ public class JavaAPISuite implements Serializable {
   public void keyBy() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
     List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
-      public String call(Integer t) throws Exception {
+      @Override
+      public String call(Integer t) {
         return t.toString();
       }
     }).collect();
@@ -941,10 +949,10 @@ public class JavaAPISuite implements Serializable {
   public void checkpointAndComputation() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
     sc.setCheckpointDir(tempDir.getAbsolutePath());
-    Assert.assertEquals(false, rdd.isCheckpointed());
+    Assert.assertFalse(rdd.isCheckpointed());
     rdd.checkpoint();
     rdd.count(); // Forces the DAG to cause a checkpoint
-    Assert.assertEquals(true, rdd.isCheckpointed());
+    Assert.assertTrue(rdd.isCheckpointed());
     Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
   }
 
@@ -952,10 +960,10 @@ public class JavaAPISuite implements Serializable {
   public void checkpointAndRestore() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
     sc.setCheckpointDir(tempDir.getAbsolutePath());
-    Assert.assertEquals(false, rdd.isCheckpointed());
+    Assert.assertFalse(rdd.isCheckpointed());
     rdd.checkpoint();
     rdd.count(); // Forces the DAG to cause a checkpoint
-    Assert.assertEquals(true, rdd.isCheckpointed());
+    Assert.assertTrue(rdd.isCheckpointed());
 
     Assert.assertTrue(rdd.getCheckpointFile().isPresent());
     JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
@@ -966,16 +974,17 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void mapOnPairRDD() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
-      @Override
-      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
-        return new Tuple2<Integer, Integer>(i, i % 2);
-      }
-    });
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
+        new PairFunction<Integer, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Integer i) {
+            return new Tuple2<Integer, Integer>(i, i % 2);
+          }
+        });
     JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
         new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
       @Override
-      public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
+      public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) {
         return new Tuple2<Integer, Integer>(in._2(), in._1());
       }
     });
@@ -992,14 +1001,15 @@ public class JavaAPISuite implements Serializable {
   public void collectPartitions() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
 
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
-      @Override
-      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
-        return new Tuple2<Integer, Integer>(i, i % 2);
-      }
-    });
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
+        new PairFunction<Integer, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Integer i) {
+            return new Tuple2<Integer, Integer>(i, i % 2);
+          }
+        });
 
-    List[] parts = rdd1.collectPartitions(new int[] {0});
+    List<Integer>[] parts = rdd1.collectPartitions(new int[] {0});
     Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
 
     parts = rdd1.collectPartitions(new int[] {1, 2});
@@ -1010,14 +1020,14 @@ public class JavaAPISuite implements Serializable {
                                       new Tuple2<Integer, Integer>(2, 0)),
                         rdd2.collectPartitions(new int[] {0})[0]);
 
-    parts = rdd2.collectPartitions(new int[] {1, 2});
+    List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2});
     Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
                                       new Tuple2<Integer, Integer>(4, 0)),
-                        parts[0]);
+                        parts2[0]);
     Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
                                       new Tuple2<Integer, Integer>(6, 0),
                                       new Tuple2<Integer, Integer>(7, 1)),
-                        parts[1]);
+                        parts2[1]);
   }
 
   @Test
@@ -1034,10 +1044,12 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void countApproxDistinctByKey() {
     List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
-    for (int i = 10; i < 100; i++)
-      for (int j = 0; j < i; j++)
+    for (int i = 10; i < 100; i++) {
+      for (int j = 0; j < i; j++) {
         arrayData.add(new Tuple2<Integer, Integer>(i, j));
-
+      }
+    }
+    double relativeSD = 0.001;
     JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
     List<Tuple2<Integer, Object>> res =  pairRdd.countApproxDistinctByKey(8, 0).collect();
     for (Tuple2<Integer, Object> resItem : res) {
@@ -1053,12 +1065,13 @@ public class JavaAPISuite implements Serializable {
   public void collectAsMapWithIntArrayValues() {
     // Regression test for SPARK-1040
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
-    JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(new PairFunction<Integer, Integer, int[]>() {
-      @Override
-      public Tuple2<Integer, int[]> call(Integer x) throws Exception {
-        return new Tuple2<Integer, int[]>(x, new int[] { x });
-      }
-    });
+    JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(
+        new PairFunction<Integer, Integer, int[]>() {
+          @Override
+          public Tuple2<Integer, int[]> call(Integer x) {
+            return new Tuple2<Integer, int[]>(x, new int[] { x });
+          }
+        });
     pairRDD.collect();  // Works fine
     pairRDD.collectAsMap();  // Used to crash with ClassCastException
   }
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index c366c10b15..729bc0459c 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -99,16 +99,16 @@ public class Java8APISuite implements Serializable {
   @Test
   public void leftOuterJoin() {
     JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(1, 2),
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(3, 1)
+      new Tuple2<>(1, 1),
+      new Tuple2<>(1, 2),
+      new Tuple2<>(2, 1),
+      new Tuple2<>(3, 1)
     ));
     JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<Integer, Character>(1, 'x'),
-      new Tuple2<Integer, Character>(2, 'y'),
-      new Tuple2<Integer, Character>(2, 'z'),
-      new Tuple2<Integer, Character>(4, 'w')
+      new Tuple2<>(1, 'x'),
+      new Tuple2<>(2, 'y'),
+      new Tuple2<>(2, 'z'),
+      new Tuple2<>(4, 'w')
     ));
     List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
       rdd1.leftOuterJoin(rdd2).collect();
@@ -133,11 +133,11 @@ public class Java8APISuite implements Serializable {
   @Test
   public void foldByKey() {
     List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(3, 2),
-      new Tuple2<Integer, Integer>(3, 1)
+      new Tuple2<>(2, 1),
+      new Tuple2<>(2, 1),
+      new Tuple2<>(1, 1),
+      new Tuple2<>(3, 2),
+      new Tuple2<>(3, 1)
     );
     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
     JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
@@ -149,11 +149,11 @@ public class Java8APISuite implements Serializable {
   @Test
   public void reduceByKey() {
     List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(3, 2),
-      new Tuple2<Integer, Integer>(3, 1)
+      new Tuple2<>(2, 1),
+      new Tuple2<>(2, 1),
+      new Tuple2<>(1, 1),
+      new Tuple2<>(3, 2),
+      new Tuple2<>(3, 1)
     );
     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
     JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
@@ -177,7 +177,7 @@ public class Java8APISuite implements Serializable {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
     JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
     doubles.collect();
-    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x))
+    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
       .cache();
     pairs.collect();
     JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
@@ -194,31 +194,31 @@ public class Java8APISuite implements Serializable {
     Assert.assertEquals(11, words.count());
 
     JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
-      List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>();
-      for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word));
+      List<Tuple2<String, String>> pairs2 = new LinkedList<>();
+      for (String word : s.split(" ")) pairs2.add(new Tuple2<>(word, word));
       return pairs2;
     });
 
-    Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
+    Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
     Assert.assertEquals(11, pairs.count());
 
     JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
-      List<Double> lengths = new LinkedList<Double>();
+      List<Double> lengths = new LinkedList<>();
       for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
       return lengths;
     });
 
     Double x = doubles.first();
-    Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+    Assert.assertEquals(5.0, doubles.first(), 0.01);
     Assert.assertEquals(11, pairs.count());
   }
 
   @Test
   public void mapsFromPairsToPairs() {
     List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<Integer, String>(1, "a"),
-      new Tuple2<Integer, String>(2, "aa"),
-      new Tuple2<Integer, String>(3, "aaa")
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
     );
     JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
 
@@ -251,19 +251,18 @@ public class Java8APISuite implements Serializable {
     tempDir.deleteOnExit();
     String outputDir = new File(tempDir, "output").getAbsolutePath();
     List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<Integer, String>(1, "a"),
-      new Tuple2<Integer, String>(2, "aa"),
-      new Tuple2<Integer, String>(3, "aaa")
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.mapToPair(pair ->
-      new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
+    rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
       .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
 
     // Try reading the output back as an object file
     JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
-      .mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()));
+      .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
     Assert.assertEquals(pairs, readRDD.collect());
     Utils.deleteRecursively(tempDir);
   }
@@ -325,7 +324,7 @@ public class Java8APISuite implements Serializable {
       }
     };
 
-    final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+    final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
     rdd.foreach(x -> floatAccum.add((float) x));
     Assert.assertEquals((Float) 25.0f, floatAccum.value());
 
@@ -338,22 +337,22 @@ public class Java8APISuite implements Serializable {
   public void keyBy() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
     List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
-    Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
-    Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+    Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
+    Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
   }
 
   @Test
   public void mapOnPairRDD() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
     JavaPairRDD<Integer, Integer> rdd2 =
-      rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+      rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
     JavaPairRDD<Integer, Integer> rdd3 =
-      rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1()));
+      rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
     Assert.assertEquals(Arrays.asList(
       new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(0, 2),
-      new Tuple2<Integer, Integer>(1, 3),
-      new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+      new Tuple2<>(0, 2),
+      new Tuple2<>(1, 3),
+      new Tuple2<>(0, 4)), rdd3.collect());
   }
 
   @Test
@@ -361,7 +360,7 @@ public class Java8APISuite implements Serializable {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
 
     JavaPairRDD<Integer, Integer> rdd2 =
-      rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+      rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
     List[] parts = rdd1.collectPartitions(new int[]{0});
     Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
 
@@ -369,16 +368,13 @@ public class Java8APISuite implements Serializable {
     Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
     Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
 
-    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(2, 0)),
+    Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
       rdd2.collectPartitions(new int[]{0})[0]);
 
     parts = rdd2.collectPartitions(new int[]{1, 2});
-    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
-      new Tuple2<Integer, Integer>(4, 0)), parts[0]);
-    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
-      new Tuple2<Integer, Integer>(6, 0),
-      new Tuple2<Integer, Integer>(7, 1)), parts[1]);
+    Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts[0]);
+    Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
+      parts[1]);
   }
 
   @Test
@@ -386,7 +382,7 @@ public class Java8APISuite implements Serializable {
     // Regression test for SPARK-1040
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
     JavaPairRDD<Integer, int[]> pairRDD =
-      rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x}));
+      rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
     pairRDD.collect();  // Works fine
     Map<Integer, int[]> map = pairRDD.collectAsMap();  // Used to crash with ClassCastException
   }
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 43df0dea61..73091cfe2c 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -39,6 +39,7 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
  * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
  * lambda syntax.
  */
+@SuppressWarnings("unchecked")
 public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
 
   @Test
@@ -52,7 +53,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
       Arrays.asList(9, 4));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<Integer> letterCount = stream.map(s -> s.length());
+    JavaDStream<Integer> letterCount = stream.map(String::length);
     JavaTestUtils.attachTestOutputStream(letterCount);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
 
@@ -63,7 +64,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testFilter() {
     List<List<String>> inputData = Arrays.asList(
       Arrays.asList("giants", "dodgers"),
-      Arrays.asList("yankees", "red socks"));
+      Arrays.asList("yankees", "red sox"));
 
     List<List<String>> expected = Arrays.asList(
       Arrays.asList("giants"),
@@ -81,11 +82,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testMapPartitions() {
     List<List<String>> inputData = Arrays.asList(
       Arrays.asList("giants", "dodgers"),
-      Arrays.asList("yankees", "red socks"));
+      Arrays.asList("yankees", "red sox"));
 
     List<List<String>> expected = Arrays.asList(
       Arrays.asList("GIANTSDODGERS"),
-      Arrays.asList("YANKEESRED SOCKS"));
+      Arrays.asList("YANKEESRED SOX"));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaDStream<String> mapped = stream.mapPartitions(in -> {
@@ -172,7 +173,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
 
     List<List<Tuple2<String, Integer>>> pairInputData =
-      Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+      Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
       JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
 
@@ -192,32 +193,32 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testTransformWith() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, String>("california", "dodgers"),
-        new Tuple2<String, String>("new york", "yankees")),
+        new Tuple2<>("california", "dodgers"),
+        new Tuple2<>("new york", "yankees")),
       Arrays.asList(
-        new Tuple2<String, String>("california", "sharks"),
-        new Tuple2<String, String>("new york", "rangers")));
+        new Tuple2<>("california", "sharks"),
+        new Tuple2<>("new york", "rangers")));
 
     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, String>("california", "giants"),
-        new Tuple2<String, String>("new york", "mets")),
+        new Tuple2<>("california", "giants"),
+        new Tuple2<>("new york", "mets")),
       Arrays.asList(
-        new Tuple2<String, String>("california", "ducks"),
-        new Tuple2<String, String>("new york", "islanders")));
+        new Tuple2<>("california", "ducks"),
+        new Tuple2<>("new york", "islanders")));
 
 
-    List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+    List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
       Sets.newHashSet(
-        new Tuple2<String, Tuple2<String, String>>("california",
-          new Tuple2<String, String>("dodgers", "giants")),
-        new Tuple2<String, Tuple2<String, String>>("new york",
-          new Tuple2<String, String>("yankees", "mets"))),
+        new Tuple2<>("california",
+          new Tuple2<>("dodgers", "giants")),
+        new Tuple2<>("new york",
+          new Tuple2<>("yankees", "mets"))),
       Sets.newHashSet(
-        new Tuple2<String, Tuple2<String, String>>("california",
-          new Tuple2<String, String>("sharks", "ducks")),
-        new Tuple2<String, Tuple2<String, String>>("new york",
-          new Tuple2<String, String>("rangers", "islanders"))));
+        new Tuple2<>("california",
+          new Tuple2<>("sharks", "ducks")),
+        new Tuple2<>("new york",
+          new Tuple2<>("rangers", "islanders"))));
 
     JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
       ssc, stringStringKVStream1, 1);
@@ -232,7 +233,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     JavaTestUtils.attachTestOutputStream(joined);
     List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-    List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+    List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
     for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
       unorderedResult.add(Sets.newHashSet(res));
     }
@@ -251,9 +252,9 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
 
     List<List<Tuple2<String, Integer>>> pairInputData1 =
-      Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+      Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
     List<List<Tuple2<Double, Character>>> pairInputData2 =
-      Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+      Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
     JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
       JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
     JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
@@ -293,13 +294,13 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     );
 
     List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
-      Arrays.asList(new Tuple2<Integer, String>(1, "x")),
-      Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+      Arrays.asList(new Tuple2<>(1, "x")),
+      Arrays.asList(new Tuple2<>(2, "y"))
     );
 
     List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
-      Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+      Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
+      Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
     );
 
     JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
@@ -312,7 +313,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     // This is just to test whether this transform to JavaStream compiles
     JavaDStream<Long> transformed1 = ssc.transform(
       listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
-      assert (listOfRDDs.size() == 2);
+      Assert.assertEquals(2, listOfRDDs.size());
       return null;
     });
 
@@ -321,13 +322,13 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
       listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
-      assert (listOfRDDs.size() == 3);
+      Assert.assertEquals(3, listOfRDDs.size());
       JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
       JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
       JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
       JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
       PairFunction<Integer, Integer, Integer> mapToTuple =
-        (Integer i) -> new Tuple2<Integer, Integer>(i, i);
+        (Integer i) -> new Tuple2<>(i, i);
       return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
     });
     JavaTestUtils.attachTestOutputStream(transformed2);
@@ -365,36 +366,36 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, String>(6, "g"),
-        new Tuple2<Integer, String>(6, "i"),
-        new Tuple2<Integer, String>(6, "a"),
-        new Tuple2<Integer, String>(6, "n"),
-        new Tuple2<Integer, String>(6, "t"),
-        new Tuple2<Integer, String>(6, "s")),
+        new Tuple2<>(6, "g"),
+        new Tuple2<>(6, "i"),
+        new Tuple2<>(6, "a"),
+        new Tuple2<>(6, "n"),
+        new Tuple2<>(6, "t"),
+        new Tuple2<>(6, "s")),
       Arrays.asList(
-        new Tuple2<Integer, String>(7, "d"),
-        new Tuple2<Integer, String>(7, "o"),
-        new Tuple2<Integer, String>(7, "d"),
-        new Tuple2<Integer, String>(7, "g"),
-        new Tuple2<Integer, String>(7, "e"),
-        new Tuple2<Integer, String>(7, "r"),
-        new Tuple2<Integer, String>(7, "s")),
+        new Tuple2<>(7, "d"),
+        new Tuple2<>(7, "o"),
+        new Tuple2<>(7, "d"),
+        new Tuple2<>(7, "g"),
+        new Tuple2<>(7, "e"),
+        new Tuple2<>(7, "r"),
+        new Tuple2<>(7, "s")),
       Arrays.asList(
-        new Tuple2<Integer, String>(9, "a"),
-        new Tuple2<Integer, String>(9, "t"),
-        new Tuple2<Integer, String>(9, "h"),
-        new Tuple2<Integer, String>(9, "l"),
-        new Tuple2<Integer, String>(9, "e"),
-        new Tuple2<Integer, String>(9, "t"),
-        new Tuple2<Integer, String>(9, "i"),
-        new Tuple2<Integer, String>(9, "c"),
-        new Tuple2<Integer, String>(9, "s")));
+        new Tuple2<>(9, "a"),
+        new Tuple2<>(9, "t"),
+        new Tuple2<>(9, "h"),
+        new Tuple2<>(9, "l"),
+        new Tuple2<>(9, "e"),
+        new Tuple2<>(9, "t"),
+        new Tuple2<>(9, "i"),
+        new Tuple2<>(9, "c"),
+        new Tuple2<>(9, "s")));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
       List<Tuple2<Integer, String>> out = Lists.newArrayList();
       for (String letter : s.split("(?!^)")) {
-        out.add(new Tuple2<Integer, String>(s.length(), letter));
+        out.add(new Tuple2<>(s.length(), letter));
       }
       return out;
     });
@@ -411,12 +412,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
    */
   public static <T extends Comparable<T>> void assertOrderInvariantEquals(
     List<List<T>> expected, List<List<T>> actual) {
-    for (List<T> list : expected) {
-      Collections.sort(list);
-    }
-    for (List<T> list : actual) {
-      Collections.sort(list);
-    }
+    expected.forEach((List<T> list) -> Collections.sort(list));
+    actual.forEach((List<T> list) -> Collections.sort(list));
     Assert.assertEquals(expected, actual);
   }
 
@@ -424,11 +421,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testPairFilter() {
     List<List<String>> inputData = Arrays.asList(
       Arrays.asList("giants", "dodgers"),
-      Arrays.asList("yankees", "red socks"));
+      Arrays.asList("yankees", "red sox"));
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
-      Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+      Arrays.asList(new Tuple2<>("giants", 6)),
+      Arrays.asList(new Tuple2<>("yankees", 7)));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream =
@@ -441,26 +438,26 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   }
 
   List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
-    Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
-      new Tuple2<String, String>("california", "giants"),
-      new Tuple2<String, String>("new york", "yankees"),
-      new Tuple2<String, String>("new york", "mets")),
-    Arrays.asList(new Tuple2<String, String>("california", "sharks"),
-      new Tuple2<String, String>("california", "ducks"),
-      new Tuple2<String, String>("new york", "rangers"),
-      new Tuple2<String, String>("new york", "islanders")));
+    Arrays.asList(new Tuple2<>("california", "dodgers"),
+      new Tuple2<>("california", "giants"),
+      new Tuple2<>("new york", "yankees"),
+      new Tuple2<>("new york", "mets")),
+    Arrays.asList(new Tuple2<>("california", "sharks"),
+      new Tuple2<>("california", "ducks"),
+      new Tuple2<>("new york", "rangers"),
+      new Tuple2<>("new york", "islanders")));
 
   List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
     Arrays.asList(
-      new Tuple2<String, Integer>("california", 1),
-      new Tuple2<String, Integer>("california", 3),
-      new Tuple2<String, Integer>("new york", 4),
-      new Tuple2<String, Integer>("new york", 1)),
+      new Tuple2<>("california", 1),
+      new Tuple2<>("california", 3),
+      new Tuple2<>("new york", 4),
+      new Tuple2<>("new york", 1)),
     Arrays.asList(
-      new Tuple2<String, Integer>("california", 5),
-      new Tuple2<String, Integer>("california", 5),
-      new Tuple2<String, Integer>("new york", 3),
-      new Tuple2<String, Integer>("new york", 1)));
+      new Tuple2<>("california", 5),
+      new Tuple2<>("california", 5),
+      new Tuple2<>("new york", 3),
+      new Tuple2<>("new york", 1)));
 
   @Test
   public void testPairMap() { // Maps pair -> pair of different type
@@ -468,15 +465,15 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, String>(1, "california"),
-        new Tuple2<Integer, String>(3, "california"),
-        new Tuple2<Integer, String>(4, "new york"),
-        new Tuple2<Integer, String>(1, "new york")),
+        new Tuple2<>(1, "california"),
+        new Tuple2<>(3, "california"),
+        new Tuple2<>(4, "new york"),
+        new Tuple2<>(1, "new york")),
       Arrays.asList(
-        new Tuple2<Integer, String>(5, "california"),
-        new Tuple2<Integer, String>(5, "california"),
-        new Tuple2<Integer, String>(3, "new york"),
-        new Tuple2<Integer, String>(1, "new york")));
+        new Tuple2<>(5, "california"),
+        new Tuple2<>(5, "california"),
+        new Tuple2<>(3, "new york"),
+        new Tuple2<>(1, "new york")));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -494,21 +491,21 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, String>(1, "california"),
-        new Tuple2<Integer, String>(3, "california"),
-        new Tuple2<Integer, String>(4, "new york"),
-        new Tuple2<Integer, String>(1, "new york")),
+        new Tuple2<>(1, "california"),
+        new Tuple2<>(3, "california"),
+        new Tuple2<>(4, "new york"),
+        new Tuple2<>(1, "new york")),
       Arrays.asList(
-        new Tuple2<Integer, String>(5, "california"),
-        new Tuple2<Integer, String>(5, "california"),
-        new Tuple2<Integer, String>(3, "new york"),
-        new Tuple2<Integer, String>(1, "new york")));
+        new Tuple2<>(5, "california"),
+        new Tuple2<>(5, "california"),
+        new Tuple2<>(3, "new york"),
+        new Tuple2<>(1, "new york")));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
     JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
-      LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+      LinkedList<Tuple2<Integer, String>> out = new LinkedList<>();
       while (in.hasNext()) {
         Tuple2<String, Integer> next = in.next();
         out.add(next.swap());
@@ -530,7 +527,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
       Arrays.asList(1, 3, 4, 1),
       Arrays.asList(5, 5, 3, 1));
 
-    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
     JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
     JavaTestUtils.attachTestOutputStream(reversed);
@@ -543,31 +541,31 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
     List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, Integer>("hi", 1),
-        new Tuple2<String, Integer>("ho", 2)),
+        new Tuple2<>("hi", 1),
+        new Tuple2<>("ho", 2)),
       Arrays.asList(
-        new Tuple2<String, Integer>("hi", 1),
-        new Tuple2<String, Integer>("ho", 2)));
+        new Tuple2<>("hi", 1),
+        new Tuple2<>("ho", 2)));
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, String>(1, "h"),
-        new Tuple2<Integer, String>(1, "i"),
-        new Tuple2<Integer, String>(2, "h"),
-        new Tuple2<Integer, String>(2, "o")),
+        new Tuple2<>(1, "h"),
+        new Tuple2<>(1, "i"),
+        new Tuple2<>(2, "h"),
+        new Tuple2<>(2, "o")),
       Arrays.asList(
-        new Tuple2<Integer, String>(1, "h"),
-        new Tuple2<Integer, String>(1, "i"),
-        new Tuple2<Integer, String>(2, "h"),
-        new Tuple2<Integer, String>(2, "o")));
+        new Tuple2<>(1, "h"),
+        new Tuple2<>(1, "i"),
+        new Tuple2<>(2, "h"),
+        new Tuple2<>(2, "o")));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
     JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
-      List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+      List<Tuple2<Integer, String>> out = new LinkedList<>();
       for (Character s : in._1().toCharArray()) {
-        out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+        out.add(new Tuple2<>(in._2(), s.toString()));
       }
       return out;
     });
@@ -584,11 +582,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, Integer>("california", 4),
-        new Tuple2<String, Integer>("new york", 5)),
+        new Tuple2<>("california", 4),
+        new Tuple2<>("new york", 5)),
       Arrays.asList(
-        new Tuple2<String, Integer>("california", 10),
-        new Tuple2<String, Integer>("new york", 4)));
+        new Tuple2<>("california", 10),
+        new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
       ssc, inputData, 1);
@@ -608,11 +606,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, Integer>("california", 4),
-        new Tuple2<String, Integer>("new york", 5)),
+        new Tuple2<>("california", 4),
+        new Tuple2<>("new york", 5)),
       Arrays.asList(
-        new Tuple2<String, Integer>("california", 10),
-        new Tuple2<String, Integer>("new york", 4)));
+        new Tuple2<>("california", 10),
+        new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
       ssc, inputData, 1);
@@ -632,12 +630,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, Integer>("california", 4),
-        new Tuple2<String, Integer>("new york", 5)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 14),
-        new Tuple2<String, Integer>("new york", 9)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 10),
-        new Tuple2<String, Integer>("new york", 4)));
+      Arrays.asList(new Tuple2<>("california", 4),
+        new Tuple2<>("new york", 5)),
+      Arrays.asList(new Tuple2<>("california", 14),
+        new Tuple2<>("new york", 9)),
+      Arrays.asList(new Tuple2<>("california", 10),
+        new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -656,12 +654,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, Integer>("california", 4),
-        new Tuple2<String, Integer>("new york", 5)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 14),
-        new Tuple2<String, Integer>("new york", 9)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 14),
-        new Tuple2<String, Integer>("new york", 9)));
+      Arrays.asList(new Tuple2<>("california", 4),
+        new Tuple2<>("new york", 5)),
+      Arrays.asList(new Tuple2<>("california", 14),
+        new Tuple2<>("new york", 9)),
+      Arrays.asList(new Tuple2<>("california", 14),
+        new Tuple2<>("new york", 9)));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -689,12 +687,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, Integer>("california", 4),
-        new Tuple2<String, Integer>("new york", 5)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 14),
-        new Tuple2<String, Integer>("new york", 9)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 10),
-        new Tuple2<String, Integer>("new york", 4)));
+      Arrays.asList(new Tuple2<>("california", 4),
+        new Tuple2<>("new york", 5)),
+      Arrays.asList(new Tuple2<>("california", 14),
+        new Tuple2<>("new york", 9)),
+      Arrays.asList(new Tuple2<>("california", 10),
+        new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -713,27 +711,27 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testPairTransform() {
     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(1, 5),
-        new Tuple2<Integer, Integer>(4, 5),
-        new Tuple2<Integer, Integer>(2, 5)),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(1, 5),
+        new Tuple2<>(4, 5),
+        new Tuple2<>(2, 5)),
       Arrays.asList(
-        new Tuple2<Integer, Integer>(2, 5),
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(4, 5),
-        new Tuple2<Integer, Integer>(1, 5)));
+        new Tuple2<>(2, 5),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(4, 5),
+        new Tuple2<>(1, 5)));
 
     List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, Integer>(1, 5),
-        new Tuple2<Integer, Integer>(2, 5),
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(4, 5)),
+        new Tuple2<>(1, 5),
+        new Tuple2<>(2, 5),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(4, 5)),
       Arrays.asList(
-        new Tuple2<Integer, Integer>(1, 5),
-        new Tuple2<Integer, Integer>(2, 5),
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(4, 5)));
+        new Tuple2<>(1, 5),
+        new Tuple2<>(2, 5),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(4, 5)));
 
     JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
       ssc, inputData, 1);
@@ -751,15 +749,15 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testPairToNormalRDDTransform() {
     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(1, 5),
-        new Tuple2<Integer, Integer>(4, 5),
-        new Tuple2<Integer, Integer>(2, 5)),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(1, 5),
+        new Tuple2<>(4, 5),
+        new Tuple2<>(2, 5)),
       Arrays.asList(
-        new Tuple2<Integer, Integer>(2, 5),
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(4, 5),
-        new Tuple2<Integer, Integer>(1, 5)));
+        new Tuple2<>(2, 5),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(4, 5),
+        new Tuple2<>(1, 5)));
 
     List<List<Integer>> expected = Arrays.asList(
       Arrays.asList(3, 1, 4, 2),
@@ -780,20 +778,20 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
 
     List<List<Tuple2<String, String>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
-        new Tuple2<String, String>("california", "GIANTS"),
-        new Tuple2<String, String>("new york", "YANKEES"),
-        new Tuple2<String, String>("new york", "METS")),
-      Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
-        new Tuple2<String, String>("california", "DUCKS"),
-        new Tuple2<String, String>("new york", "RANGERS"),
-        new Tuple2<String, String>("new york", "ISLANDERS")));
+      Arrays.asList(new Tuple2<>("california", "DODGERS"),
+        new Tuple2<>("california", "GIANTS"),
+        new Tuple2<>("new york", "YANKEES"),
+        new Tuple2<>("new york", "METS")),
+      Arrays.asList(new Tuple2<>("california", "SHARKS"),
+        new Tuple2<>("california", "DUCKS"),
+        new Tuple2<>("new york", "RANGERS"),
+        new Tuple2<>("new york", "ISLANDERS")));
 
     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
       ssc, inputData, 1);
     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase());
+    JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase);
     JavaTestUtils.attachTestOutputStream(mapped);
     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
 
@@ -805,34 +803,29 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
 
     List<List<Tuple2<String, String>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
-        new Tuple2<String, String>("california", "dodgers2"),
-        new Tuple2<String, String>("california", "giants1"),
-        new Tuple2<String, String>("california", "giants2"),
-        new Tuple2<String, String>("new york", "yankees1"),
-        new Tuple2<String, String>("new york", "yankees2"),
-        new Tuple2<String, String>("new york", "mets1"),
-        new Tuple2<String, String>("new york", "mets2")),
-      Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
-        new Tuple2<String, String>("california", "sharks2"),
-        new Tuple2<String, String>("california", "ducks1"),
-        new Tuple2<String, String>("california", "ducks2"),
-        new Tuple2<String, String>("new york", "rangers1"),
-        new Tuple2<String, String>("new york", "rangers2"),
-        new Tuple2<String, String>("new york", "islanders1"),
-        new Tuple2<String, String>("new york", "islanders2")));
+      Arrays.asList(new Tuple2<>("california", "dodgers1"),
+        new Tuple2<>("california", "dodgers2"),
+        new Tuple2<>("california", "giants1"),
+        new Tuple2<>("california", "giants2"),
+        new Tuple2<>("new york", "yankees1"),
+        new Tuple2<>("new york", "yankees2"),
+        new Tuple2<>("new york", "mets1"),
+        new Tuple2<>("new york", "mets2")),
+      Arrays.asList(new Tuple2<>("california", "sharks1"),
+        new Tuple2<>("california", "sharks2"),
+        new Tuple2<>("california", "ducks1"),
+        new Tuple2<>("california", "ducks2"),
+        new Tuple2<>("new york", "rangers1"),
+        new Tuple2<>("new york", "rangers2"),
+        new Tuple2<>("new york", "islanders1"),
+        new Tuple2<>("new york", "islanders2")));
 
     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
       ssc, inputData, 1);
     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
 
-
-    JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> {
-      List<String> out = new ArrayList<String>();
-      out.add(in + "1");
-      out.add(in + "2");
-      return out;
-    });
+    JavaPairDStream<String, String> flatMapped =
+      pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2"));
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
     Assert.assertEquals(expected, result);
-- 
GitLab