Skip to content
Snippets Groups Projects
Commit a0eec8e8 authored by Liwei Lin's avatar Liwei Lin Committed by Sean Owen
Browse files

[SPARK-15208][WIP][CORE][STREAMING][DOCS] Update Spark examples with AccumulatorV2

## What changes were proposed in this pull request?

The patch updates the codes & docs in the example module as well as the related doc module:

- [ ] [docs] `streaming-programming-guide.md`
  - [x] scala code part
  - [ ] java code part
  - [ ] python code part
- [x] [examples] `RecoverableNetworkWordCount.scala`
- [ ] [examples] `JavaRecoverableNetworkWordCount.java`
- [ ] [examples] `recoverable_network_wordcount.py`

## How was this patch tested?

Ran the examples and verified results manually.

Author: Liwei Lin <lwlin7@gmail.com>

Closes #12981 from lw-lin/accumulatorV2-examples.
parent 5eea3323
No related branches found
No related tags found
No related merge requests found
...@@ -1395,13 +1395,13 @@ object WordBlacklist { ...@@ -1395,13 +1395,13 @@ object WordBlacklist {
object DroppedWordsCounter { object DroppedWordsCounter {
@volatile private var instance: Accumulator[Long] = null @volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): Accumulator[Long] = { def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) { if (instance == null) {
synchronized { synchronized {
if (instance == null) { if (instance == null) {
instance = sc.accumulator(0L, "WordsInBlacklistCounter") instance = sc.longAccumulator("WordsInBlacklistCounter")
} }
} }
} }
...@@ -1409,7 +1409,7 @@ object DroppedWordsCounter { ...@@ -1409,7 +1409,7 @@ object DroppedWordsCounter {
} }
} }
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast // Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext) val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator // Get or register the droppedWordsCounter Accumulator
...@@ -1417,12 +1417,12 @@ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { ...@@ -1417,12 +1417,12 @@ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// Use blacklist to drop words and use droppedWordsCounter to count them // Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) => val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) { if (blacklist.value.contains(word)) {
droppedWordsCounter += count droppedWordsCounter.add(count)
false false
} else { } else {
true true
} }
}.collect() }.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts val output = "Counts at time " + time + " " + counts
}) })
......
...@@ -27,8 +27,7 @@ import org.apache.spark.{SparkConf, SparkContext} ...@@ -27,8 +27,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.util.IntParam import org.apache.spark.util.{IntParam, LongAccumulator}
import org.apache.spark.util.LongAccumulator
/** /**
* Use this singleton to get or register a Broadcast variable. * Use this singleton to get or register a Broadcast variable.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment