From 1faef149f763f4a54aaa6e17043d0a628ae338a0 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@apache.org>
Date: Tue, 3 Jun 2014 18:37:40 -0700
Subject: [PATCH] SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus
 instead of HyperLogLog.

I also corrected some errors made in the previous HLL count approximate API, including relativeSD wasn't really a measure for error (and we used it to test error bounds in test results).

Author: Reynold Xin <rxin@apache.org>

Closes #897 from rxin/hll and squashes the following commits:

4d83f41 [Reynold Xin] New error bound and non-randomness.
f154ea0 [Reynold Xin] Added a comment on the value bound for testing.
e367527 [Reynold Xin] One more round of code review.
41e649a [Reynold Xin] Update final mima list.
9e320c8 [Reynold Xin] Incorporate code review feedback.
e110d70 [Reynold Xin] Merge branch 'master' into hll
354deb8 [Reynold Xin] Added comment on the Mima exclude rules.
acaa524 [Reynold Xin] Added the right exclude rules in MimaExcludes.
6555bfe [Reynold Xin] Added a default method and re-arranged MimaExcludes.
1db1522 [Reynold Xin] Excluded util.SerializableHyperLogLog from MIMA check.
9221b27 [Reynold Xin] Merge branch 'master' into hll
88cfe77 [Reynold Xin] Updated documentation and restored the old incorrect API to maintain API compatibility.
1294be6 [Reynold Xin] Updated HLL+.
e7786cb [Reynold Xin] Merge branch 'master' into hll
c0ef0c2 [Reynold Xin] SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus instead of HyperLogLog.
---
 .../apache/spark/api/java/JavaPairRDD.scala   | 51 ++++++-----
 .../apache/spark/api/java/JavaRDDLike.scala   | 12 +--
 .../apache/spark/rdd/PairRDDFunctions.scala   | 90 ++++++++++++++-----
 .../main/scala/org/apache/spark/rdd/RDD.scala | 53 ++++++++---
 .../spark/util/SerializableHyperLogLog.scala  | 52 -----------
 .../java/org/apache/spark/JavaAPISuite.java   | 10 +--
 .../spark/rdd/PairRDDFunctionsSuite.scala     | 22 ++---
 .../scala/org/apache/spark/rdd/RDDSuite.scala |  6 +-
 pom.xml                                       |  4 +-
 project/MimaExcludes.scala                    | 22 ++++-
 project/SparkBuild.scala                      |  2 +-
 11 files changed, 189 insertions(+), 135 deletions(-)
 delete mode 100644 core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala

diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 4c8f9ed6fb..7dcfbf741c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
 
   /**
    * Return approximate number of distinct values for each key in this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. Uses the provided
-   * Partitioner to partition the output RDD.
+   *
+   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+   *
+   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+   *                   It must be greater than 0.000017.
+   * @param partitioner partitioner of the resulting RDD.
    */
-  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
-    rdd.countApproxDistinctByKey(relativeSD, partitioner)
+  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
+  {
+    fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
   }
 
   /**
-   * Return approximate number of distinct values for each key this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. The default value of
-   * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
-   * level.
+   * Return approximate number of distinct values for each key in this RDD.
+   *
+   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+   *
+   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+   *                   It must be greater than 0.000017.
+   * @param numPartitions number of partitions of the resulting RDD.
    */
-  def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
-    rdd.countApproxDistinctByKey(relativeSD)
+  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
+    fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
   }
 
-
   /**
    * Return approximate number of distinct values for each key in this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
-   * output RDD into numPartitions.
    *
+   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+   *
+   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+   *                   It must be greater than 0.000017.
    */
-  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
-    rdd.countApproxDistinctByKey(relativeSD, numPartitions)
+  def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
+    fromRDD(rdd.countApproxDistinctByKey(relativeSD))
   }
 
   /** Assign a name to this RDD */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 619bfd75be..330569a8d8 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   /**
    * Return approximate number of distinct elements in the RDD.
    *
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. The default value of
-   * relativeSD is 0.05.
+   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+   *
+   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+   *                   It must be greater than 0.000017.
    */
-  def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
+  def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)
 
   def name(): String = rdd.name
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 223fef7926..f2ce3cbd47 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.io.SequenceFile.CompressionType
@@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.SparkContext._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.SerializableHyperLogLog
 
 /**
  * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -214,39 +213,88 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
+   * :: Experimental ::
+   *
    * Return approximate number of distinct values for each key in this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vice versa. Uses the provided
-   * Partitioner to partition the output RDD.
+   *
+   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+   *
+   * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
+   * would trigger sparse representation of registers, which may reduce the memory consumption
+   * and increase accuracy when the cardinality is small.
+   *
+   * @param p The precision value for the normal set.
+   *          `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
+   * @param sp The precision value for the sparse set, between 0 and 32.
+   *           If `sp` equals 0, the sparse representation is skipped.
+   * @param partitioner Partitioner to use for the resulting RDD.
    */
-  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
-    val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
-    val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
-    val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
+  @Experimental
+  def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = {
+    require(p >= 4, s"p ($p) must be >= 4")
+    require(sp <= 32, s"sp ($sp) must be <= 32")
+    require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
+    val createHLL = (v: V) => {
+      val hll = new HyperLogLogPlus(p, sp)
+      hll.offer(v)
+      hll
+    }
+    val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
+      hll.offer(v)
+      hll
+    }
+    val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
+      h1.addAll(h2)
+      h1
+    }
+
+    combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
+  }
 
-    combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
+  /**
+   * Return approximate number of distinct values for each key in this RDD.
+   *
+   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+   *
+   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+   *                   It must be greater than 0.000017.
+   * @param partitioner partitioner of the resulting RDD
+   */
+  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+    require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
+    val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
+    assert(p <= 32)
+    countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner)
   }
 
   /**
    * Return approximate number of distinct values for each key in this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vice versa. HashPartitions the
-   * output RDD into numPartitions.
    *
+   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+   *
+   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+   *                   It must be greater than 0.000017.
+   * @param numPartitions number of partitions of the resulting RDD
    */
   def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
     countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
   }
 
   /**
-   * Return approximate number of distinct values for each key this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vice versa. The default value of
-   * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
-   * level.
+   * Return approximate number of distinct values for each key in this RDD.
+   *
+   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+   *
+   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+   *                   It must be greater than 0.000017.
    */
   def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
     countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index aa03e9276f..585b2f76af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -19,12 +19,11 @@ package org.apache.spark.rdd
 
 import java.util.Random
 
-import scala.collection.Map
-import scala.collection.mutable
+import scala.collection.{mutable, Map}
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.{classTag, ClassTag}
 
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import org.apache.hadoop.io.BytesWritable
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.io.NullWritable
@@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
+import org.apache.spark.util.{BoundedPriorityQueue, Utils}
 import org.apache.spark.util.collection.OpenHashMap
 import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}
 
@@ -921,15 +920,49 @@ abstract class RDD[T: ClassTag](
    * :: Experimental ::
    * Return approximate number of distinct elements in the RDD.
    *
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. The default value of
-   * relativeSD is 0.05.
+   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+   *
+   * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p`
+   * would trigger sparse representation of registers, which may reduce the memory consumption
+   * and increase accuracy when the cardinality is small.
+   *
+   * @param p The precision value for the normal set.
+   *          `p` must be a value between 4 and `sp` if `sp` is not zero (32 max).
+   * @param sp The precision value for the sparse set, between 0 and 32.
+   *           If `sp` equals 0, the sparse representation is skipped.
    */
   @Experimental
+  def countApproxDistinct(p: Int, sp: Int): Long = {
+    require(p >= 4, s"p ($p) must be greater than 0")
+    require(sp <= 32, s"sp ($sp) cannot be greater than 32")
+    require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
+    val zeroCounter = new HyperLogLogPlus(p, sp)
+    aggregate(zeroCounter)(
+      (hll: HyperLogLogPlus, v: T) => {
+        hll.offer(v)
+        hll
+      },
+      (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
+        h1.addAll(h2)
+        h2
+      }).cardinality()
+  }
+
+  /**
+   * Return approximate number of distinct elements in the RDD.
+   *
+   * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
+   * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
+   * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+   *
+   * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
+   *                   It must be greater than 0.000017.
+   */
   def countApproxDistinct(relativeSD: Double = 0.05): Long = {
-    val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
-    aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
+    val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
+    countApproxDistinct(p, 0)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
deleted file mode 100644
index 21a88eea3b..0000000000
--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-
-import com.clearspring.analytics.stream.cardinality.{HyperLogLog, ICardinality}
-
-/**
- * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is
- * serializable.
- */
-private[spark]
-class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
-
-  def this() = this(null)  // For deserialization
-
-  def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
-
-  def add[T](elem: T) = {
-    this.value.offer(elem)
-    this
-  }
-
-  def readExternal(in: ObjectInput) {
-    val byteLength = in.readInt()
-    val bytes = new Array[Byte](byteLength)
-    in.readFully(bytes)
-    value = HyperLogLog.Builder.build(bytes)
-  }
-
-  def writeExternal(out: ObjectOutput) {
-    val bytes = value.getBytes()
-    out.writeInt(bytes.length)
-    out.write(bytes)
-  }
-}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 7193223add..b78309f81c 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1028,27 +1028,23 @@ public class JavaAPISuite implements Serializable {
       arrayData.add(i % size);
     }
     JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10);
-    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2);
-    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05);
-    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01);
+    Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.1);
   }
 
   @Test
   public void countApproxDistinctByKey() {
-    double relativeSD = 0.001;
-
     List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>();
     for (int i = 10; i < 100; i++)
       for (int j = 0; j < i; j++)
         arrayData.add(new Tuple2<Integer, Integer>(i, j));
 
     JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
-    List<Tuple2<Integer, Object>> res =  pairRdd.countApproxDistinctByKey(relativeSD).collect();
+    List<Tuple2<Integer, Object>> res =  pairRdd.countApproxDistinctByKey(8, 0).collect();
     for (Tuple2<Integer, Object> resItem : res) {
       double count = (double)resItem._1();
       Long resCount = (Long)resItem._2();
       Double error = Math.abs((resCount - count) / count);
-      Assert.assertTrue(error < relativeSD);
+      Assert.assertTrue(error < 0.1);
     }
 
   }
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 1230565ea5..9ddafc4518 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -119,28 +119,30 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
      * relatively tight error bounds to check correctness of functionality rather than checking
      * whether the approximation conforms with the requested bound.
      */
-    val relativeSD = 0.001
+    val p = 20
+    val sp = 0
+    // When p = 20, the relative accuracy is about 0.001. So with high probability, the
+    // relative error should be smaller than the threshold 0.01 we use here.
+    val relativeSD = 0.01
 
     // For each value i, there are i tuples with first element equal to i.
     // Therefore, the expected count for key i would be i.
     val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
     val rdd1 = sc.parallelize(stacked)
-    val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
-    counted1.foreach{
-      case(k, count) => assert(error(count, k) < relativeSD)
-    }
+    val counted1 = rdd1.countApproxDistinctByKey(p, sp).collect()
+    counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) }
 
-    val rnd = new Random()
+    val rnd = new Random(42)
 
     // The expected count for key num would be num
     val randStacked = (1 to 100).flatMap { i =>
-      val num = rnd.nextInt % 500
+      val num = rnd.nextInt() % 500
       (1 to num).map(j => (num, j))
     }
     val rdd2 = sc.parallelize(randStacked)
-    val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
-    counted2.foreach{
-      case(k, count) => assert(error(count, k) < relativeSD)
+    val counted2 = rdd2.countApproxDistinctByKey(relativeSD).collect()
+    counted2.foreach { case (k, count) =>
+      assert(error(count, k) < relativeSD, s"${error(count, k)} < $relativeSD")
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index e686068f7a..bbd0c14178 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -73,10 +73,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     val size = 100
     val uniformDistro = for (i <- 1 to 100000) yield i % size
     val simpleRdd = sc.makeRDD(uniformDistro)
-    assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2)
-    assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05)
-    assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01)
-    assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001)
+    assert(error(simpleRdd.countApproxDistinct(4, 0), size) < 0.4)
+    assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.1)
   }
 
   test("SparkContext.union") {
diff --git a/pom.xml b/pom.xml
index 0a5ca9e72a..fcd6f66b44 100644
--- a/pom.xml
+++ b/pom.xml
@@ -300,9 +300,9 @@
       <dependency>
         <groupId>com.clearspring.analytics</groupId>
         <artifactId>stream</artifactId>
-        <version>2.5.1</version>
+        <version>2.7.0</version>
         <exclusions>
-          <!-- Only HyperLogLog is used, which doesn't depend on fastutil -->
+          <!-- Only HyperLogLogPlus is used, which doesn't depend on fastutil -->
           <exclusion>
             <groupId>it.unimi.dsi</groupId>
             <artifactId>fastutil</artifactId>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fc9cbeaec6..fadf6a4d8b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -16,7 +16,6 @@
  */
 
 import com.typesafe.tools.mima.core._
-import com.typesafe.tools.mima.core.ProblemFilters._
 
 /**
  * Additional excludes for checking of Spark's binary compatibility.
@@ -35,8 +34,27 @@ object MimaExcludes {
     val excludes =
       SparkBuild.SPARK_VERSION match {
         case v if v.startsWith("1.1") =>
+          Seq(MimaBuild.excludeSparkPackage("graphx")) ++
           Seq(
-            MimaBuild.excludeSparkPackage("graphx"))
+            // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
+            // for countApproxDistinct* functions, which does not work in Java. We later removed
+            // them, and use the following to tell Mima to not care about them.
+            ProblemFilters.exclude[IncompatibleResultTypeProblem](
+              "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
+            ProblemFilters.exclude[IncompatibleResultTypeProblem](
+              "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1")
+          ) ++
+          MimaBuild.excludeSparkClass("util.SerializableHyperLogLog")
         case v if v.startsWith("1.0") =>
           Seq(
             MimaBuild.excludeSparkPackage("api.java"),
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c2a20d86b2..efb0b9319b 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -362,7 +362,7 @@ object SparkBuild extends Build {
         "com.twitter"               %% "chill"            % chillVersion excludeAll(excludeAsm),
         "com.twitter"                % "chill-java"       % chillVersion excludeAll(excludeAsm),
         "org.tachyonproject"         % "tachyon"          % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
-        "com.clearspring.analytics"  % "stream"           % "2.5.1" excludeAll(excludeFastutil),
+        "com.clearspring.analytics"  % "stream"           % "2.7.0" excludeAll(excludeFastutil), // Only HyperLogLogPlus is used, which does not depend on fastutil.
         "org.spark-project"          % "pyrolite"         % "2.0.1",
         "net.sf.py4j"                % "py4j"             % "0.8.1"
       ),
-- 
GitLab