diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index dc698dea75e436ec047126faa674de9d312fb4cb..23d13710794af158082f57319dcd624faf7d56f6 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -108,6 +108,28 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
   def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
     wrapRDD(rdd.sample(withReplacement, fraction, seed))
 
+
+  /**
+   * Randomly splits this RDD with the provided weights.
+   *
+   * @param weights weights for splits, will be normalized if they don't sum to 1
+   *
+   * @return split RDDs in an array
+   */
+  def randomSplit(weights: Array[Double]): Array[JavaRDD[T]] =
+    randomSplit(weights, Utils.random.nextLong)
+
+  /**
+   * Randomly splits this RDD with the provided weights.
+   *
+   * @param weights weights for splits, will be normalized if they don't sum to 1
+   * @param seed random seed
+   *
+   * @return split RDDs in an array
+   */
+  def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]] =
+    rdd.randomSplit(weights, seed).map(wrapRDD)
+
   /**
    * Return the union of this RDD and another one. Any identical elements will appear multiple
    * times (use `.distinct()` to eliminate them).
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index b78309f81cb8c9d82f8fa481105dd8b37fad3ce5..50a62129116f188e7e8aca9a3db2750ff00cf36d 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -23,6 +23,7 @@ import java.util.*;
 import scala.Tuple2;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.base.Optional;
 import com.google.common.base.Charsets;
@@ -48,7 +49,6 @@ import org.apache.spark.partial.BoundedDouble;
 import org.apache.spark.partial.PartialResult;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.util.StatCounter;
-import org.apache.spark.util.Utils;
 
 // The test suite itself is Serializable so that anonymous Function implementations can be
 // serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -70,16 +70,6 @@ public class JavaAPISuite implements Serializable {
     sc = null;
   }
 
-  static class ReverseIntComparator implements Comparator<Integer>, Serializable {
-
-    @Override
-    public int compare(Integer a, Integer b) {
-      if (a > b) return -1;
-      else if (a < b) return 1;
-      else return 0;
-    }
-  }
-
   @SuppressWarnings("unchecked")
   @Test
   public void sparkContextUnion() {
@@ -124,7 +114,7 @@ public class JavaAPISuite implements Serializable {
     JavaRDD<Integer> intersections = s1.intersection(s2);
     Assert.assertEquals(3, intersections.count());
 
-    ArrayList<Integer> list = new ArrayList<Integer>();
+    List<Integer> list = new ArrayList<Integer>();
     JavaRDD<Integer> empty = sc.parallelize(list);
     JavaRDD<Integer> emptyIntersection = empty.intersection(s2);
     Assert.assertEquals(0, emptyIntersection.count());
@@ -144,6 +134,28 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(2, pIntersection.count());
   }
 
+  @Test
+  public void sample() {
+    List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    JavaRDD<Integer> rdd = sc.parallelize(ints);
+    JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 11);
+    // expected 2 but of course result varies randomly a bit
+    Assert.assertEquals(3, sample20.count());
+    JavaRDD<Integer> sample20NoReplacement = rdd.sample(false, 0.2, 11);
+    Assert.assertEquals(2, sample20NoReplacement.count());
+  }
+
+  @Test
+  public void randomSplit() {
+    List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    JavaRDD<Integer> rdd = sc.parallelize(ints);
+    JavaRDD<Integer>[] splits = rdd.randomSplit(new double[] { 0.4, 0.6, 1.0 }, 11);
+    Assert.assertEquals(3, splits.length);
+    Assert.assertEquals(2, splits[0].count());
+    Assert.assertEquals(3, splits[1].count());
+    Assert.assertEquals(5, splits[2].count());
+  }
+
   @Test
   public void sortByKey() {
     List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
@@ -161,26 +173,24 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
 
     // Custom comparator
-    sortedRDD = rdd.sortByKey(new ReverseIntComparator(), false);
+    sortedRDD = rdd.sortByKey(Collections.<Integer>reverseOrder(), false);
     Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first());
     sortedPairs = sortedRDD.collect();
     Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1));
     Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
   }
 
-  static int foreachCalls = 0;
-
   @Test
   public void foreach() {
-    foreachCalls = 0;
+    final Accumulator<Integer> accum = sc.accumulator(0);
     JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
     rdd.foreach(new VoidFunction<String>() {
       @Override
-      public void call(String s) {
-        foreachCalls++;
+      public void call(String s) throws IOException {
+        accum.add(1);
       }
     });
-    Assert.assertEquals(2, foreachCalls);
+    Assert.assertEquals(2, accum.value().intValue());
   }
 
   @Test
@@ -188,7 +198,7 @@ public class JavaAPISuite implements Serializable {
     List<Integer> correct = Arrays.asList(1, 2, 3, 4);
     JavaRDD<Integer> rdd = sc.parallelize(correct);
     List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
-    Assert.assertTrue(correct.equals(result));
+    Assert.assertEquals(correct, result);
   }
 
   @Test
@@ -196,7 +206,7 @@ public class JavaAPISuite implements Serializable {
     List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
     JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId();
     JavaRDD<Long> indexes = zip.values();
-    Assert.assertTrue(new HashSet<Long>(indexes.collect()).size() == 4);
+    Assert.assertEquals(4, new HashSet<Long>(indexes.collect()).size());
   }
 
   @Test
@@ -205,7 +215,7 @@ public class JavaAPISuite implements Serializable {
     JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex();
     JavaRDD<Long> indexes = zip.values();
     List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L);
-    Assert.assertTrue(indexes.collect().equals(correctIndexes));
+    Assert.assertEquals(correctIndexes, indexes.collect());
   }
 
   @SuppressWarnings("unchecked")
@@ -252,8 +262,10 @@ public class JavaAPISuite implements Serializable {
       new Tuple2<String, Integer>("Oranges", 2),
       new Tuple2<String, Integer>("Apples", 3)
     ));
-    JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = categories.cogroup(prices);
-    Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+    JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped =
+        categories.cogroup(prices);
+    Assert.assertEquals("[Fruit, Citrus]",
+                        Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
     Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
 
     cogrouped.collect();
@@ -281,8 +293,7 @@ public class JavaAPISuite implements Serializable {
       rdd1.leftOuterJoin(rdd2).filter(
         new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
           @Override
-          public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup)
-            throws Exception {
+          public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup) {
             return !tup._2()._2().isPresent();
           }
       }).first();
@@ -356,8 +367,7 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(2, localCounts.get(2).intValue());
     Assert.assertEquals(3, localCounts.get(3).intValue());
 
-   localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer,
-      Integer>() {
+    localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer, Integer>() {
       @Override
       public Integer call(Integer a, Integer b) {
         return a + b;
@@ -448,16 +458,17 @@ public class JavaAPISuite implements Serializable {
     JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
       @Override
       public double call(Integer x) {
-        return 1.0 * x;
+        return x.doubleValue();
       }
     }).cache();
     doubles.collect();
-    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(new PairFunction<Integer, Integer, Integer>() {
-      @Override
-      public Tuple2<Integer, Integer> call(Integer x) {
-        return new Tuple2<Integer, Integer>(x, x);
-      }
-    }).cache();
+    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(
+        new PairFunction<Integer, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Integer x) {
+            return new Tuple2<Integer, Integer>(x, x);
+          }
+        }).cache();
     pairs.collect();
     JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
       @Override
@@ -487,7 +498,9 @@ public class JavaAPISuite implements Serializable {
         @Override
         public Iterable<Tuple2<String, String>> call(String s) {
           List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>();
-          for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word));
+          for (String word : s.split(" ")) {
+            pairs.add(new Tuple2<String, String>(word, word));
+          }
           return pairs;
         }
       }
@@ -499,7 +512,9 @@ public class JavaAPISuite implements Serializable {
       @Override
       public Iterable<Double> call(String s) {
         List<Double> lengths = new LinkedList<Double>();
-        for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
+        for (String word : s.split(" ")) {
+          lengths.add((double) word.length());
+        }
         return lengths;
       }
     });
@@ -521,7 +536,7 @@ public class JavaAPISuite implements Serializable {
       JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
           new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
           @Override
-          public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception {
+          public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
               return Collections.singletonList(item.swap());
           }
       });
@@ -530,7 +545,7 @@ public class JavaAPISuite implements Serializable {
       // There was never a bug here, but it's worth testing:
       pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
           @Override
-          public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception {
+          public Tuple2<String, Integer> call(Tuple2<Integer, String> item) {
               return item.swap();
           }
       }).collect();
@@ -631,14 +646,10 @@ public class JavaAPISuite implements Serializable {
     byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8");
 
     String tempDirName = tempDir.getAbsolutePath();
-    DataOutputStream ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00000"));
-    ds.write(content1);
-    ds.close();
-    ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00001"));
-    ds.write(content2);
-    ds.close();
-
-    HashMap<String, String> container = new HashMap<String, String>();
+    Files.write(content1, new File(tempDirName + "/part-00000"));
+    Files.write(content2, new File(tempDirName + "/part-00001"));
+
+    Map<String, String> container = new HashMap<String, String>();
     container.put(tempDirName+"/part-00000", new Text(content1).toString());
     container.put(tempDirName+"/part-00001", new Text(content2).toString());
 
@@ -844,7 +855,7 @@ public class JavaAPISuite implements Serializable {
     JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {
       @Override
       public double call(Integer x) {
-        return 1.0 * x;
+        return x.doubleValue();
       }
     });
     JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
@@ -859,17 +870,7 @@ public class JavaAPISuite implements Serializable {
       new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
         @Override
         public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
-          int sizeI = 0;
-          int sizeS = 0;
-          while (i.hasNext()) {
-            sizeI += 1;
-            i.next();
-          }
-          while (s.hasNext()) {
-            sizeS += 1;
-            s.next();
-          }
-          return Arrays.asList(sizeI, sizeS);
+          return Arrays.asList(Iterators.size(i), Iterators.size(s));
         }
       };
 
@@ -883,6 +884,7 @@ public class JavaAPISuite implements Serializable {
 
     final Accumulator<Integer> intAccum = sc.intAccumulator(10);
     rdd.foreach(new VoidFunction<Integer>() {
+      @Override
       public void call(Integer x) {
         intAccum.add(x);
       }
@@ -891,6 +893,7 @@ public class JavaAPISuite implements Serializable {
 
     final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
     rdd.foreach(new VoidFunction<Integer>() {
+      @Override
       public void call(Integer x) {
         doubleAccum.add((double) x);
       }
@@ -899,14 +902,17 @@ public class JavaAPISuite implements Serializable {
 
     // Try a custom accumulator type
     AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+      @Override
       public Float addInPlace(Float r, Float t) {
         return r + t;
       }
 
+      @Override
       public Float addAccumulator(Float r, Float t) {
         return r + t;
       }
 
+      @Override
       public Float zero(Float initialValue) {
         return 0.0f;
       }
@@ -914,6 +920,7 @@ public class JavaAPISuite implements Serializable {
 
     final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
     rdd.foreach(new VoidFunction<Integer>() {
+      @Override
       public void call(Integer x) {
         floatAccum.add((float) x);
       }
@@ -929,7 +936,8 @@ public class JavaAPISuite implements Serializable {
   public void keyBy() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
     List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() {
-      public String call(Integer t) throws Exception {
+      @Override
+      public String call(Integer t) {
         return t.toString();
       }
     }).collect();
@@ -941,10 +949,10 @@ public class JavaAPISuite implements Serializable {
   public void checkpointAndComputation() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
     sc.setCheckpointDir(tempDir.getAbsolutePath());
-    Assert.assertEquals(false, rdd.isCheckpointed());
+    Assert.assertFalse(rdd.isCheckpointed());
     rdd.checkpoint();
     rdd.count(); // Forces the DAG to cause a checkpoint
-    Assert.assertEquals(true, rdd.isCheckpointed());
+    Assert.assertTrue(rdd.isCheckpointed());
     Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
   }
 
@@ -952,10 +960,10 @@ public class JavaAPISuite implements Serializable {
   public void checkpointAndRestore() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
     sc.setCheckpointDir(tempDir.getAbsolutePath());
-    Assert.assertEquals(false, rdd.isCheckpointed());
+    Assert.assertFalse(rdd.isCheckpointed());
     rdd.checkpoint();
     rdd.count(); // Forces the DAG to cause a checkpoint
-    Assert.assertEquals(true, rdd.isCheckpointed());
+    Assert.assertTrue(rdd.isCheckpointed());
 
     Assert.assertTrue(rdd.getCheckpointFile().isPresent());
     JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
@@ -966,16 +974,17 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void mapOnPairRDD() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
-      @Override
-      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
-        return new Tuple2<Integer, Integer>(i, i % 2);
-      }
-    });
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
+        new PairFunction<Integer, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Integer i) {
+            return new Tuple2<Integer, Integer>(i, i % 2);
+          }
+        });
     JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(
         new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
       @Override
-      public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
+      public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) {
         return new Tuple2<Integer, Integer>(in._2(), in._1());
       }
     });
@@ -992,14 +1001,15 @@ public class JavaAPISuite implements Serializable {
   public void collectPartitions() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
 
-    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(new PairFunction<Integer, Integer, Integer>() {
-      @Override
-      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
-        return new Tuple2<Integer, Integer>(i, i % 2);
-      }
-    });
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
+        new PairFunction<Integer, Integer, Integer>() {
+          @Override
+          public Tuple2<Integer, Integer> call(Integer i) {
+            return new Tuple2<Integer, Integer>(i, i % 2);
+          }
+        });
 
-    List[] parts = rdd1.collectPartitions(new int[] {0});
+    List<Integer>[] parts = rdd1.collectPartitions(new int[] {0});
     Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
 
     parts = rdd1.collectPartitions(new int[] {1, 2});
@@ -1010,14 +1020,14 @@ public class JavaAPISuite implements Serializable {
                                       new Tuple2<Integer, Integer>(2, 0)),
                         rdd2.collectPartitions(new int[] {0})[0]);
 
-    parts = rdd2.collectPartitions(new int[] {1, 2});
+    List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2});
     Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
                                       new Tuple2<Integer, Integer>(4, 0)),
-                        parts[0]);
+                        parts2[0]);
     Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
                                       new Tuple2<Integer, Integer>(6, 0),
                                       new Tuple2<Integer, Integer>(7, 1)),
-                        parts[1]);
+                        parts2[1]);
   }
 
   @Test
@@ -1034,10 +1044,12 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void countApproxDistinctByKey() {
     List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
-    for (int i = 10; i < 100; i++)
-      for (int j = 0; j < i; j++)
+    for (int i = 10; i < 100; i++) {
+      for (int j = 0; j < i; j++) {
         arrayData.add(new Tuple2<Integer, Integer>(i, j));
-
+      }
+    }
+    double relativeSD = 0.001;
     JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
     List<Tuple2<Integer, Object>> res =  pairRdd.countApproxDistinctByKey(8, 0).collect();
     for (Tuple2<Integer, Object> resItem : res) {
@@ -1053,12 +1065,13 @@ public class JavaAPISuite implements Serializable {
   public void collectAsMapWithIntArrayValues() {
     // Regression test for SPARK-1040
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
-    JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(new PairFunction<Integer, Integer, int[]>() {
-      @Override
-      public Tuple2<Integer, int[]> call(Integer x) throws Exception {
-        return new Tuple2<Integer, int[]>(x, new int[] { x });
-      }
-    });
+    JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(
+        new PairFunction<Integer, Integer, int[]>() {
+          @Override
+          public Tuple2<Integer, int[]> call(Integer x) {
+            return new Tuple2<Integer, int[]>(x, new int[] { x });
+          }
+        });
     pairRDD.collect();  // Works fine
     pairRDD.collectAsMap();  // Used to crash with ClassCastException
   }
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index c366c10b15a201ff37a213563fb447950cca49d9..729bc0459ce52f4d7f906a39c71810ae9b82d881 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -99,16 +99,16 @@ public class Java8APISuite implements Serializable {
   @Test
   public void leftOuterJoin() {
     JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(1, 2),
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(3, 1)
+      new Tuple2<>(1, 1),
+      new Tuple2<>(1, 2),
+      new Tuple2<>(2, 1),
+      new Tuple2<>(3, 1)
     ));
     JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
-      new Tuple2<Integer, Character>(1, 'x'),
-      new Tuple2<Integer, Character>(2, 'y'),
-      new Tuple2<Integer, Character>(2, 'z'),
-      new Tuple2<Integer, Character>(4, 'w')
+      new Tuple2<>(1, 'x'),
+      new Tuple2<>(2, 'y'),
+      new Tuple2<>(2, 'z'),
+      new Tuple2<>(4, 'w')
     ));
     List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
       rdd1.leftOuterJoin(rdd2).collect();
@@ -133,11 +133,11 @@ public class Java8APISuite implements Serializable {
   @Test
   public void foldByKey() {
     List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(3, 2),
-      new Tuple2<Integer, Integer>(3, 1)
+      new Tuple2<>(2, 1),
+      new Tuple2<>(2, 1),
+      new Tuple2<>(1, 1),
+      new Tuple2<>(3, 2),
+      new Tuple2<>(3, 1)
     );
     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
     JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
@@ -149,11 +149,11 @@ public class Java8APISuite implements Serializable {
   @Test
   public void reduceByKey() {
     List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(2, 1),
-      new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(3, 2),
-      new Tuple2<Integer, Integer>(3, 1)
+      new Tuple2<>(2, 1),
+      new Tuple2<>(2, 1),
+      new Tuple2<>(1, 1),
+      new Tuple2<>(3, 2),
+      new Tuple2<>(3, 1)
     );
     JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
     JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
@@ -177,7 +177,7 @@ public class Java8APISuite implements Serializable {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
     JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
     doubles.collect();
-    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x))
+    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
       .cache();
     pairs.collect();
     JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
@@ -194,31 +194,31 @@ public class Java8APISuite implements Serializable {
     Assert.assertEquals(11, words.count());
 
     JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
-      List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>();
-      for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word));
+      List<Tuple2<String, String>> pairs2 = new LinkedList<>();
+      for (String word : s.split(" ")) pairs2.add(new Tuple2<>(word, word));
       return pairs2;
     });
 
-    Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
+    Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
     Assert.assertEquals(11, pairs.count());
 
     JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
-      List<Double> lengths = new LinkedList<Double>();
+      List<Double> lengths = new LinkedList<>();
       for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
       return lengths;
     });
 
     Double x = doubles.first();
-    Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+    Assert.assertEquals(5.0, doubles.first(), 0.01);
     Assert.assertEquals(11, pairs.count());
   }
 
   @Test
   public void mapsFromPairsToPairs() {
     List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<Integer, String>(1, "a"),
-      new Tuple2<Integer, String>(2, "aa"),
-      new Tuple2<Integer, String>(3, "aaa")
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
     );
     JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
 
@@ -251,19 +251,18 @@ public class Java8APISuite implements Serializable {
     tempDir.deleteOnExit();
     String outputDir = new File(tempDir, "output").getAbsolutePath();
     List<Tuple2<Integer, String>> pairs = Arrays.asList(
-      new Tuple2<Integer, String>(1, "a"),
-      new Tuple2<Integer, String>(2, "aa"),
-      new Tuple2<Integer, String>(3, "aaa")
+      new Tuple2<>(1, "a"),
+      new Tuple2<>(2, "aa"),
+      new Tuple2<>(3, "aaa")
     );
     JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
 
-    rdd.mapToPair(pair ->
-      new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
+    rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
       .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
 
     // Try reading the output back as an object file
     JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
-      .mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()));
+      .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
     Assert.assertEquals(pairs, readRDD.collect());
     Utils.deleteRecursively(tempDir);
   }
@@ -325,7 +324,7 @@ public class Java8APISuite implements Serializable {
       }
     };
 
-    final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+    final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
     rdd.foreach(x -> floatAccum.add((float) x));
     Assert.assertEquals((Float) 25.0f, floatAccum.value());
 
@@ -338,22 +337,22 @@ public class Java8APISuite implements Serializable {
   public void keyBy() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
     List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
-    Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
-    Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
+    Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
+    Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
   }
 
   @Test
   public void mapOnPairRDD() {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
     JavaPairRDD<Integer, Integer> rdd2 =
-      rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+      rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
     JavaPairRDD<Integer, Integer> rdd3 =
-      rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1()));
+      rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
     Assert.assertEquals(Arrays.asList(
       new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(0, 2),
-      new Tuple2<Integer, Integer>(1, 3),
-      new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+      new Tuple2<>(0, 2),
+      new Tuple2<>(1, 3),
+      new Tuple2<>(0, 4)), rdd3.collect());
   }
 
   @Test
@@ -361,7 +360,7 @@ public class Java8APISuite implements Serializable {
     JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
 
     JavaPairRDD<Integer, Integer> rdd2 =
-      rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
+      rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
     List[] parts = rdd1.collectPartitions(new int[]{0});
     Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
 
@@ -369,16 +368,13 @@ public class Java8APISuite implements Serializable {
     Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
     Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
 
-    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
-      new Tuple2<Integer, Integer>(2, 0)),
+    Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
       rdd2.collectPartitions(new int[]{0})[0]);
 
     parts = rdd2.collectPartitions(new int[]{1, 2});
-    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
-      new Tuple2<Integer, Integer>(4, 0)), parts[0]);
-    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
-      new Tuple2<Integer, Integer>(6, 0),
-      new Tuple2<Integer, Integer>(7, 1)), parts[1]);
+    Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts[0]);
+    Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
+      parts[1]);
   }
 
   @Test
@@ -386,7 +382,7 @@ public class Java8APISuite implements Serializable {
     // Regression test for SPARK-1040
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
     JavaPairRDD<Integer, int[]> pairRDD =
-      rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x}));
+      rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
     pairRDD.collect();  // Works fine
     Map<Integer, int[]> map = pairRDD.collectAsMap();  // Used to crash with ClassCastException
   }
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 43df0dea614bcf54ed5e6fc2dab6cfed2866d5b9..73091cfe2c09e1534c45462cbe54aaed1856f860 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -39,6 +39,7 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
  * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
  * lambda syntax.
  */
+@SuppressWarnings("unchecked")
 public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
 
   @Test
@@ -52,7 +53,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
       Arrays.asList(9, 4));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-    JavaDStream<Integer> letterCount = stream.map(s -> s.length());
+    JavaDStream<Integer> letterCount = stream.map(String::length);
     JavaTestUtils.attachTestOutputStream(letterCount);
     List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
 
@@ -63,7 +64,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testFilter() {
     List<List<String>> inputData = Arrays.asList(
       Arrays.asList("giants", "dodgers"),
-      Arrays.asList("yankees", "red socks"));
+      Arrays.asList("yankees", "red sox"));
 
     List<List<String>> expected = Arrays.asList(
       Arrays.asList("giants"),
@@ -81,11 +82,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testMapPartitions() {
     List<List<String>> inputData = Arrays.asList(
       Arrays.asList("giants", "dodgers"),
-      Arrays.asList("yankees", "red socks"));
+      Arrays.asList("yankees", "red sox"));
 
     List<List<String>> expected = Arrays.asList(
       Arrays.asList("GIANTSDODGERS"),
-      Arrays.asList("YANKEESRED SOCKS"));
+      Arrays.asList("YANKEESRED SOX"));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaDStream<String> mapped = stream.mapPartitions(in -> {
@@ -172,7 +173,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
 
     List<List<Tuple2<String, Integer>>> pairInputData =
-      Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+      Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
       JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
 
@@ -192,32 +193,32 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testTransformWith() {
     List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, String>("california", "dodgers"),
-        new Tuple2<String, String>("new york", "yankees")),
+        new Tuple2<>("california", "dodgers"),
+        new Tuple2<>("new york", "yankees")),
       Arrays.asList(
-        new Tuple2<String, String>("california", "sharks"),
-        new Tuple2<String, String>("new york", "rangers")));
+        new Tuple2<>("california", "sharks"),
+        new Tuple2<>("new york", "rangers")));
 
     List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, String>("california", "giants"),
-        new Tuple2<String, String>("new york", "mets")),
+        new Tuple2<>("california", "giants"),
+        new Tuple2<>("new york", "mets")),
       Arrays.asList(
-        new Tuple2<String, String>("california", "ducks"),
-        new Tuple2<String, String>("new york", "islanders")));
+        new Tuple2<>("california", "ducks"),
+        new Tuple2<>("new york", "islanders")));
 
 
-    List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+    List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
       Sets.newHashSet(
-        new Tuple2<String, Tuple2<String, String>>("california",
-          new Tuple2<String, String>("dodgers", "giants")),
-        new Tuple2<String, Tuple2<String, String>>("new york",
-          new Tuple2<String, String>("yankees", "mets"))),
+        new Tuple2<>("california",
+          new Tuple2<>("dodgers", "giants")),
+        new Tuple2<>("new york",
+          new Tuple2<>("yankees", "mets"))),
       Sets.newHashSet(
-        new Tuple2<String, Tuple2<String, String>>("california",
-          new Tuple2<String, String>("sharks", "ducks")),
-        new Tuple2<String, Tuple2<String, String>>("new york",
-          new Tuple2<String, String>("rangers", "islanders"))));
+        new Tuple2<>("california",
+          new Tuple2<>("sharks", "ducks")),
+        new Tuple2<>("new york",
+          new Tuple2<>("rangers", "islanders"))));
 
     JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
       ssc, stringStringKVStream1, 1);
@@ -232,7 +233,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     JavaTestUtils.attachTestOutputStream(joined);
     List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-    List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+    List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
     for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
       unorderedResult.add(Sets.newHashSet(res));
     }
@@ -251,9 +252,9 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
 
     List<List<Tuple2<String, Integer>>> pairInputData1 =
-      Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+      Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
     List<List<Tuple2<Double, Character>>> pairInputData2 =
-      Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+      Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
     JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
       JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
     JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
@@ -293,13 +294,13 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     );
 
     List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
-      Arrays.asList(new Tuple2<Integer, String>(1, "x")),
-      Arrays.asList(new Tuple2<Integer, String>(2, "y"))
+      Arrays.asList(new Tuple2<>(1, "x")),
+      Arrays.asList(new Tuple2<>(2, "y"))
     );
 
     List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(1, new Tuple2<Integer, String>(1, "x"))),
-      Arrays.asList(new Tuple2<Integer, Tuple2<Integer, String>>(2, new Tuple2<Integer, String>(2, "y")))
+      Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
+      Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
     );
 
     JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
@@ -312,7 +313,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     // This is just to test whether this transform to JavaStream compiles
     JavaDStream<Long> transformed1 = ssc.transform(
       listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
-      assert (listOfRDDs.size() == 2);
+      Assert.assertEquals(2, listOfRDDs.size());
       return null;
     });
 
@@ -321,13 +322,13 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
       listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
-      assert (listOfRDDs.size() == 3);
+      Assert.assertEquals(3, listOfRDDs.size());
       JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
       JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
       JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
       JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
       PairFunction<Integer, Integer, Integer> mapToTuple =
-        (Integer i) -> new Tuple2<Integer, Integer>(i, i);
+        (Integer i) -> new Tuple2<>(i, i);
       return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
     });
     JavaTestUtils.attachTestOutputStream(transformed2);
@@ -365,36 +366,36 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, String>(6, "g"),
-        new Tuple2<Integer, String>(6, "i"),
-        new Tuple2<Integer, String>(6, "a"),
-        new Tuple2<Integer, String>(6, "n"),
-        new Tuple2<Integer, String>(6, "t"),
-        new Tuple2<Integer, String>(6, "s")),
+        new Tuple2<>(6, "g"),
+        new Tuple2<>(6, "i"),
+        new Tuple2<>(6, "a"),
+        new Tuple2<>(6, "n"),
+        new Tuple2<>(6, "t"),
+        new Tuple2<>(6, "s")),
       Arrays.asList(
-        new Tuple2<Integer, String>(7, "d"),
-        new Tuple2<Integer, String>(7, "o"),
-        new Tuple2<Integer, String>(7, "d"),
-        new Tuple2<Integer, String>(7, "g"),
-        new Tuple2<Integer, String>(7, "e"),
-        new Tuple2<Integer, String>(7, "r"),
-        new Tuple2<Integer, String>(7, "s")),
+        new Tuple2<>(7, "d"),
+        new Tuple2<>(7, "o"),
+        new Tuple2<>(7, "d"),
+        new Tuple2<>(7, "g"),
+        new Tuple2<>(7, "e"),
+        new Tuple2<>(7, "r"),
+        new Tuple2<>(7, "s")),
       Arrays.asList(
-        new Tuple2<Integer, String>(9, "a"),
-        new Tuple2<Integer, String>(9, "t"),
-        new Tuple2<Integer, String>(9, "h"),
-        new Tuple2<Integer, String>(9, "l"),
-        new Tuple2<Integer, String>(9, "e"),
-        new Tuple2<Integer, String>(9, "t"),
-        new Tuple2<Integer, String>(9, "i"),
-        new Tuple2<Integer, String>(9, "c"),
-        new Tuple2<Integer, String>(9, "s")));
+        new Tuple2<>(9, "a"),
+        new Tuple2<>(9, "t"),
+        new Tuple2<>(9, "h"),
+        new Tuple2<>(9, "l"),
+        new Tuple2<>(9, "e"),
+        new Tuple2<>(9, "t"),
+        new Tuple2<>(9, "i"),
+        new Tuple2<>(9, "c"),
+        new Tuple2<>(9, "s")));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
       List<Tuple2<Integer, String>> out = Lists.newArrayList();
       for (String letter : s.split("(?!^)")) {
-        out.add(new Tuple2<Integer, String>(s.length(), letter));
+        out.add(new Tuple2<>(s.length(), letter));
       }
       return out;
     });
@@ -411,12 +412,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
    */
   public static <T extends Comparable<T>> void assertOrderInvariantEquals(
     List<List<T>> expected, List<List<T>> actual) {
-    for (List<T> list : expected) {
-      Collections.sort(list);
-    }
-    for (List<T> list : actual) {
-      Collections.sort(list);
-    }
+    expected.forEach((List<T> list) -> Collections.sort(list));
+    actual.forEach((List<T> list) -> Collections.sort(list));
     Assert.assertEquals(expected, actual);
   }
 
@@ -424,11 +421,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testPairFilter() {
     List<List<String>> inputData = Arrays.asList(
       Arrays.asList("giants", "dodgers"),
-      Arrays.asList("yankees", "red socks"));
+      Arrays.asList("yankees", "red sox"));
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
-      Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+      Arrays.asList(new Tuple2<>("giants", 6)),
+      Arrays.asList(new Tuple2<>("yankees", 7)));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream =
@@ -441,26 +438,26 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   }
 
   List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
-    Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
-      new Tuple2<String, String>("california", "giants"),
-      new Tuple2<String, String>("new york", "yankees"),
-      new Tuple2<String, String>("new york", "mets")),
-    Arrays.asList(new Tuple2<String, String>("california", "sharks"),
-      new Tuple2<String, String>("california", "ducks"),
-      new Tuple2<String, String>("new york", "rangers"),
-      new Tuple2<String, String>("new york", "islanders")));
+    Arrays.asList(new Tuple2<>("california", "dodgers"),
+      new Tuple2<>("california", "giants"),
+      new Tuple2<>("new york", "yankees"),
+      new Tuple2<>("new york", "mets")),
+    Arrays.asList(new Tuple2<>("california", "sharks"),
+      new Tuple2<>("california", "ducks"),
+      new Tuple2<>("new york", "rangers"),
+      new Tuple2<>("new york", "islanders")));
 
   List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
     Arrays.asList(
-      new Tuple2<String, Integer>("california", 1),
-      new Tuple2<String, Integer>("california", 3),
-      new Tuple2<String, Integer>("new york", 4),
-      new Tuple2<String, Integer>("new york", 1)),
+      new Tuple2<>("california", 1),
+      new Tuple2<>("california", 3),
+      new Tuple2<>("new york", 4),
+      new Tuple2<>("new york", 1)),
     Arrays.asList(
-      new Tuple2<String, Integer>("california", 5),
-      new Tuple2<String, Integer>("california", 5),
-      new Tuple2<String, Integer>("new york", 3),
-      new Tuple2<String, Integer>("new york", 1)));
+      new Tuple2<>("california", 5),
+      new Tuple2<>("california", 5),
+      new Tuple2<>("new york", 3),
+      new Tuple2<>("new york", 1)));
 
   @Test
   public void testPairMap() { // Maps pair -> pair of different type
@@ -468,15 +465,15 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, String>(1, "california"),
-        new Tuple2<Integer, String>(3, "california"),
-        new Tuple2<Integer, String>(4, "new york"),
-        new Tuple2<Integer, String>(1, "new york")),
+        new Tuple2<>(1, "california"),
+        new Tuple2<>(3, "california"),
+        new Tuple2<>(4, "new york"),
+        new Tuple2<>(1, "new york")),
       Arrays.asList(
-        new Tuple2<Integer, String>(5, "california"),
-        new Tuple2<Integer, String>(5, "california"),
-        new Tuple2<Integer, String>(3, "new york"),
-        new Tuple2<Integer, String>(1, "new york")));
+        new Tuple2<>(5, "california"),
+        new Tuple2<>(5, "california"),
+        new Tuple2<>(3, "new york"),
+        new Tuple2<>(1, "new york")));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -494,21 +491,21 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, String>(1, "california"),
-        new Tuple2<Integer, String>(3, "california"),
-        new Tuple2<Integer, String>(4, "new york"),
-        new Tuple2<Integer, String>(1, "new york")),
+        new Tuple2<>(1, "california"),
+        new Tuple2<>(3, "california"),
+        new Tuple2<>(4, "new york"),
+        new Tuple2<>(1, "new york")),
       Arrays.asList(
-        new Tuple2<Integer, String>(5, "california"),
-        new Tuple2<Integer, String>(5, "california"),
-        new Tuple2<Integer, String>(3, "new york"),
-        new Tuple2<Integer, String>(1, "new york")));
+        new Tuple2<>(5, "california"),
+        new Tuple2<>(5, "california"),
+        new Tuple2<>(3, "new york"),
+        new Tuple2<>(1, "new york")));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
     JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
-      LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+      LinkedList<Tuple2<Integer, String>> out = new LinkedList<>();
       while (in.hasNext()) {
         Tuple2<String, Integer> next = in.next();
         out.add(next.swap());
@@ -530,7 +527,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
       Arrays.asList(1, 3, 4, 1),
       Arrays.asList(5, 5, 3, 1));
 
-    JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
     JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
     JavaTestUtils.attachTestOutputStream(reversed);
@@ -543,31 +541,31 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
     List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, Integer>("hi", 1),
-        new Tuple2<String, Integer>("ho", 2)),
+        new Tuple2<>("hi", 1),
+        new Tuple2<>("ho", 2)),
       Arrays.asList(
-        new Tuple2<String, Integer>("hi", 1),
-        new Tuple2<String, Integer>("ho", 2)));
+        new Tuple2<>("hi", 1),
+        new Tuple2<>("ho", 2)));
 
     List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, String>(1, "h"),
-        new Tuple2<Integer, String>(1, "i"),
-        new Tuple2<Integer, String>(2, "h"),
-        new Tuple2<Integer, String>(2, "o")),
+        new Tuple2<>(1, "h"),
+        new Tuple2<>(1, "i"),
+        new Tuple2<>(2, "h"),
+        new Tuple2<>(2, "o")),
       Arrays.asList(
-        new Tuple2<Integer, String>(1, "h"),
-        new Tuple2<Integer, String>(1, "i"),
-        new Tuple2<Integer, String>(2, "h"),
-        new Tuple2<Integer, String>(2, "o")));
+        new Tuple2<>(1, "h"),
+        new Tuple2<>(1, "i"),
+        new Tuple2<>(2, "h"),
+        new Tuple2<>(2, "o")));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
     JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
-      List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+      List<Tuple2<Integer, String>> out = new LinkedList<>();
       for (Character s : in._1().toCharArray()) {
-        out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+        out.add(new Tuple2<>(in._2(), s.toString()));
       }
       return out;
     });
@@ -584,11 +582,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, Integer>("california", 4),
-        new Tuple2<String, Integer>("new york", 5)),
+        new Tuple2<>("california", 4),
+        new Tuple2<>("new york", 5)),
       Arrays.asList(
-        new Tuple2<String, Integer>("california", 10),
-        new Tuple2<String, Integer>("new york", 4)));
+        new Tuple2<>("california", 10),
+        new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
       ssc, inputData, 1);
@@ -608,11 +606,11 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<String, Integer>("california", 4),
-        new Tuple2<String, Integer>("new york", 5)),
+        new Tuple2<>("california", 4),
+        new Tuple2<>("new york", 5)),
       Arrays.asList(
-        new Tuple2<String, Integer>("california", 10),
-        new Tuple2<String, Integer>("new york", 4)));
+        new Tuple2<>("california", 10),
+        new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
       ssc, inputData, 1);
@@ -632,12 +630,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, Integer>("california", 4),
-        new Tuple2<String, Integer>("new york", 5)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 14),
-        new Tuple2<String, Integer>("new york", 9)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 10),
-        new Tuple2<String, Integer>("new york", 4)));
+      Arrays.asList(new Tuple2<>("california", 4),
+        new Tuple2<>("new york", 5)),
+      Arrays.asList(new Tuple2<>("california", 14),
+        new Tuple2<>("new york", 9)),
+      Arrays.asList(new Tuple2<>("california", 10),
+        new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -656,12 +654,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, Integer>("california", 4),
-        new Tuple2<String, Integer>("new york", 5)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 14),
-        new Tuple2<String, Integer>("new york", 9)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 14),
-        new Tuple2<String, Integer>("new york", 9)));
+      Arrays.asList(new Tuple2<>("california", 4),
+        new Tuple2<>("new york", 5)),
+      Arrays.asList(new Tuple2<>("california", 14),
+        new Tuple2<>("new york", 9)),
+      Arrays.asList(new Tuple2<>("california", 14),
+        new Tuple2<>("new york", 9)));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -689,12 +687,12 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
 
     List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, Integer>("california", 4),
-        new Tuple2<String, Integer>("new york", 5)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 14),
-        new Tuple2<String, Integer>("new york", 9)),
-      Arrays.asList(new Tuple2<String, Integer>("california", 10),
-        new Tuple2<String, Integer>("new york", 4)));
+      Arrays.asList(new Tuple2<>("california", 4),
+        new Tuple2<>("new york", 5)),
+      Arrays.asList(new Tuple2<>("california", 14),
+        new Tuple2<>("new york", 9)),
+      Arrays.asList(new Tuple2<>("california", 10),
+        new Tuple2<>("new york", 4)));
 
     JavaDStream<Tuple2<String, Integer>> stream =
       JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -713,27 +711,27 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testPairTransform() {
     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(1, 5),
-        new Tuple2<Integer, Integer>(4, 5),
-        new Tuple2<Integer, Integer>(2, 5)),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(1, 5),
+        new Tuple2<>(4, 5),
+        new Tuple2<>(2, 5)),
       Arrays.asList(
-        new Tuple2<Integer, Integer>(2, 5),
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(4, 5),
-        new Tuple2<Integer, Integer>(1, 5)));
+        new Tuple2<>(2, 5),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(4, 5),
+        new Tuple2<>(1, 5)));
 
     List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, Integer>(1, 5),
-        new Tuple2<Integer, Integer>(2, 5),
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(4, 5)),
+        new Tuple2<>(1, 5),
+        new Tuple2<>(2, 5),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(4, 5)),
       Arrays.asList(
-        new Tuple2<Integer, Integer>(1, 5),
-        new Tuple2<Integer, Integer>(2, 5),
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(4, 5)));
+        new Tuple2<>(1, 5),
+        new Tuple2<>(2, 5),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(4, 5)));
 
     JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
       ssc, inputData, 1);
@@ -751,15 +749,15 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
   public void testPairToNormalRDDTransform() {
     List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
       Arrays.asList(
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(1, 5),
-        new Tuple2<Integer, Integer>(4, 5),
-        new Tuple2<Integer, Integer>(2, 5)),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(1, 5),
+        new Tuple2<>(4, 5),
+        new Tuple2<>(2, 5)),
       Arrays.asList(
-        new Tuple2<Integer, Integer>(2, 5),
-        new Tuple2<Integer, Integer>(3, 5),
-        new Tuple2<Integer, Integer>(4, 5),
-        new Tuple2<Integer, Integer>(1, 5)));
+        new Tuple2<>(2, 5),
+        new Tuple2<>(3, 5),
+        new Tuple2<>(4, 5),
+        new Tuple2<>(1, 5)));
 
     List<List<Integer>> expected = Arrays.asList(
       Arrays.asList(3, 1, 4, 2),
@@ -780,20 +778,20 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
 
     List<List<Tuple2<String, String>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
-        new Tuple2<String, String>("california", "GIANTS"),
-        new Tuple2<String, String>("new york", "YANKEES"),
-        new Tuple2<String, String>("new york", "METS")),
-      Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
-        new Tuple2<String, String>("california", "DUCKS"),
-        new Tuple2<String, String>("new york", "RANGERS"),
-        new Tuple2<String, String>("new york", "ISLANDERS")));
+      Arrays.asList(new Tuple2<>("california", "DODGERS"),
+        new Tuple2<>("california", "GIANTS"),
+        new Tuple2<>("new york", "YANKEES"),
+        new Tuple2<>("new york", "METS")),
+      Arrays.asList(new Tuple2<>("california", "SHARKS"),
+        new Tuple2<>("california", "DUCKS"),
+        new Tuple2<>("new york", "RANGERS"),
+        new Tuple2<>("new york", "ISLANDERS")));
 
     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
       ssc, inputData, 1);
     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase());
+    JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase);
     JavaTestUtils.attachTestOutputStream(mapped);
     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
 
@@ -805,34 +803,29 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
     List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
 
     List<List<Tuple2<String, String>>> expected = Arrays.asList(
-      Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
-        new Tuple2<String, String>("california", "dodgers2"),
-        new Tuple2<String, String>("california", "giants1"),
-        new Tuple2<String, String>("california", "giants2"),
-        new Tuple2<String, String>("new york", "yankees1"),
-        new Tuple2<String, String>("new york", "yankees2"),
-        new Tuple2<String, String>("new york", "mets1"),
-        new Tuple2<String, String>("new york", "mets2")),
-      Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
-        new Tuple2<String, String>("california", "sharks2"),
-        new Tuple2<String, String>("california", "ducks1"),
-        new Tuple2<String, String>("california", "ducks2"),
-        new Tuple2<String, String>("new york", "rangers1"),
-        new Tuple2<String, String>("new york", "rangers2"),
-        new Tuple2<String, String>("new york", "islanders1"),
-        new Tuple2<String, String>("new york", "islanders2")));
+      Arrays.asList(new Tuple2<>("california", "dodgers1"),
+        new Tuple2<>("california", "dodgers2"),
+        new Tuple2<>("california", "giants1"),
+        new Tuple2<>("california", "giants2"),
+        new Tuple2<>("new york", "yankees1"),
+        new Tuple2<>("new york", "yankees2"),
+        new Tuple2<>("new york", "mets1"),
+        new Tuple2<>("new york", "mets2")),
+      Arrays.asList(new Tuple2<>("california", "sharks1"),
+        new Tuple2<>("california", "sharks2"),
+        new Tuple2<>("california", "ducks1"),
+        new Tuple2<>("california", "ducks2"),
+        new Tuple2<>("new york", "rangers1"),
+        new Tuple2<>("new york", "rangers2"),
+        new Tuple2<>("new york", "islanders1"),
+        new Tuple2<>("new york", "islanders2")));
 
     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
       ssc, inputData, 1);
     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
 
-
-    JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(in -> {
-      List<String> out = new ArrayList<String>();
-      out.add(in + "1");
-      out.add(in + "2");
-      return out;
-    });
+    JavaPairDStream<String, String> flatMapped =
+      pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2"));
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
     Assert.assertEquals(expected, result);