diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 1dc5f8d2f5e1612d054d4d8102ec3ae7dab6fb1c..088b298aada26ace6c21df446c65516e0fd1e336 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -229,9 +229,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) - combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).map { - case (k, v) => (k, v.value.cardinality()) - } + combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) + } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 74fab4861938a44307d190b5e2adeacf6a966e30..161fd067e1ea254e1344da046eb257aa901fcd38 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -809,7 +809,9 @@ abstract class RDD[T: ClassTag]( } def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog) = c1.merge(c2) - mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality() + val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) + mapPartitions(hllCountPartition).aggregate(zeroCounter)(mergeCounters, mergeCounters) + .value.cardinality() } /**