diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 5ea2e5549d7df731c18c22cb07ec482b0619ad69..4eacc47da5699fe129dfcc6a77cd1ad87faa5e33 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -63,7 +63,8 @@ class TwitterReceiver( storageLevel: StorageLevel ) extends Receiver[Status](storageLevel) with Logging { - private var twitterStream: TwitterStream = _ + @volatile private var twitterStream: TwitterStream = _ + @volatile private var stopped = false def onStart() { try { @@ -78,7 +79,9 @@ class TwitterReceiver( def onScrubGeo(l: Long, l1: Long) {} def onStallWarning(stallWarning: StallWarning) {} def onException(e: Exception) { - restart("Error receiving tweets", e) + if (!stopped) { + restart("Error receiving tweets", e) + } } }) @@ -91,12 +94,14 @@ class TwitterReceiver( } setTwitterStream(newTwitterStream) logInfo("Twitter receiver started") + stopped = false } catch { case e: Exception => restart("Error starting Twitter stream", e) } } def onStop() { + stopped = true setTwitterStream(null) logInfo("Twitter receiver stopped") }