diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 6550fcc0521c36680c762334a31bd4c48dd6e4a6..78ae6a7407467296a8a71a7252407205817fa342 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1395,13 +1395,13 @@ object WordBlacklist { object DroppedWordsCounter { - @volatile private var instance: Accumulator[Long] = null + @volatile private var instance: LongAccumulator = null - def getInstance(sc: SparkContext): Accumulator[Long] = { + def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { - instance = sc.accumulator(0L, "WordsInBlacklistCounter") + instance = sc.longAccumulator("WordsInBlacklistCounter") } } } @@ -1409,7 +1409,7 @@ object DroppedWordsCounter { } } -wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { +wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator @@ -1417,12 +1417,12 @@ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { - droppedWordsCounter += count + droppedWordsCounter.add(count) false } else { true } - }.collect() + }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts }) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index acbcb0c4b7b78649443e64e05569ef40493524c3..49c0427321133d1cb994628306aa003f0f59859f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -27,8 +27,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext, Time} -import org.apache.spark.util.IntParam -import org.apache.spark.util.LongAccumulator +import org.apache.spark.util.{IntParam, LongAccumulator} /** * Use this singleton to get or register a Broadcast variable.