Skip to content
Snippets Groups Projects
Commit a45d5480 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-2464][Streaming] Fixed Twitter stream stopping bug

Stopping the Twitter Receiver would call twitter4j's TwitterStream.shutdown, which in turn causes an Exception to be thrown to the listener. This exception caused the Receiver to be restarted. This patch check whether the receiver was stopped or not, and accordingly restarts on exception.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #1577 from tdas/twitter-stop and squashes the following commits:

011b525 [Tathagata Das] Fixed Twitter stream stopping bug.
parent fec641b8
No related branches found
No related tags found
No related merge requests found
...@@ -63,7 +63,8 @@ class TwitterReceiver( ...@@ -63,7 +63,8 @@ class TwitterReceiver(
storageLevel: StorageLevel storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging { ) extends Receiver[Status](storageLevel) with Logging {
private var twitterStream: TwitterStream = _ @volatile private var twitterStream: TwitterStream = _
@volatile private var stopped = false
def onStart() { def onStart() {
try { try {
...@@ -78,7 +79,9 @@ class TwitterReceiver( ...@@ -78,7 +79,9 @@ class TwitterReceiver(
def onScrubGeo(l: Long, l1: Long) {} def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {} def onStallWarning(stallWarning: StallWarning) {}
def onException(e: Exception) { def onException(e: Exception) {
restart("Error receiving tweets", e) if (!stopped) {
restart("Error receiving tweets", e)
}
} }
}) })
...@@ -91,12 +94,14 @@ class TwitterReceiver( ...@@ -91,12 +94,14 @@ class TwitterReceiver(
} }
setTwitterStream(newTwitterStream) setTwitterStream(newTwitterStream)
logInfo("Twitter receiver started") logInfo("Twitter receiver started")
stopped = false
} catch { } catch {
case e: Exception => restart("Error starting Twitter stream", e) case e: Exception => restart("Error starting Twitter stream", e)
} }
} }
def onStop() { def onStop() {
stopped = true
setTwitterStream(null) setTwitterStream(null)
logInfo("Twitter receiver stopped") logInfo("Twitter receiver stopped")
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment