From a7de8e9b1c9859f45db4a620dd62a62d472d8396 Mon Sep 17 00:00:00 2001 From: Hossein Falaki <falaki@gmail.com> Date: Mon, 30 Dec 2013 19:28:03 -0800 Subject: [PATCH] Renamed countDistinct and countDistinctByKey methods to include Approx --- .../scala/org/apache/spark/rdd/PairRDDFunctions.scala | 10 +++++----- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 6 +++--- .../src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 10 +++++----- .../apache/spark/serializer/KryoSerializerSuite.scala | 2 +- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 4e4f860b19..1dc5f8d2f5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -217,7 +217,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * more accurate counts but increase the memory footprint and vise versa. Uses the provided * Partitioner to partition the output RDD. */ - def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { val createHLL = (v: V) => { val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) hll.value.offer(v) @@ -242,8 +242,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * output RDD into numPartitions. * */ - def countDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { - countDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { + countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } /** @@ -254,8 +254,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism * level. */ - def countDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { - countDistinctByKey(relativeSD, defaultPartitioner(self)) + def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { + countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 136fa45327..74fab48619 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -797,7 +797,7 @@ abstract class RDD[T: ClassTag]( * more accurate counts but increase the memory footprint and vise versa. The default value of * relativeSD is 0.05. */ - def countDistinct(relativeSD: Double = 0.05): Long = { + def countApproxDistinct(relativeSD: Double = 0.05): Long = { def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = { val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 6ad58b875d..5da538a1dd 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -110,7 +110,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(deps.size === 2) // ShuffledRDD, ParallelCollection. } - test("countDistinctByKey") { + test("countApproxDistinctByKey") { def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is @@ -124,7 +124,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { // Therefore, the expected count for key i would be i. val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j))) val rdd1 = sc.parallelize(stacked) - val counted1 = rdd1.countDistinctByKey(relativeSD).collect() + val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect() counted1.foreach{ case(k, count) => assert(error(count, k) < relativeSD) } @@ -137,7 +137,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { (1 to num).map(j => (num, j)) } val rdd2 = sc.parallelize(randStacked) - val counted2 = rdd2.countDistinctByKey(relativeSD, 4).collect() + val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect() counted2.foreach{ case(k, count) => assert(error(count, k) < relativeSD) } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 2f81b81797..1383359f85 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -63,17 +63,17 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } - test("countDistinct") { + test("countApproxDistinct") { def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble val size = 100 val uniformDistro = for (i <- 1 to 100000) yield i % size val simpleRdd = sc.makeRDD(uniformDistro) - assert(error(simpleRdd.countDistinct(0.2), size) < 0.2) - assert(error(simpleRdd.countDistinct(0.05), size) < 0.05) - assert(error(simpleRdd.countDistinct(0.01), size) < 0.01) - assert(error(simpleRdd.countDistinct(0.001), size) < 0.001) + assert(error(simpleRdd.countApproxDistinct(0.2), size) < 0.2) + assert(error(simpleRdd.countApproxDistinct(0.05), size) < 0.05) + assert(error(simpleRdd.countApproxDistinct(0.01), size) < 0.01) + assert(error(simpleRdd.countApproxDistinct(0.001), size) < 0.001) } test("SparkContext.union") { diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 18529710fe..636e3ab913 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -173,7 +173,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("kryo with SerializableHyperLogLog") { - assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countDistinct(0.01) === 3) + assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3) } test("kryo with reduce") { -- GitLab