Skip to content
Snippets Groups Projects
Commit cd12dd9b authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-1617] and [SPARK-1618] Improvements to streaming ui and bug fix to socket receiver

1617: These changes expose the receiver state (active or inactive) and last error in the UI
1618: If the socket receiver cannot connect in the first attempt, it should try to restart after a delay. That was broken, as the thread that restarts (hence, stops) the receiver waited on Thread.join on itself!

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #540 from tdas/streaming-ui-fix and squashes the following commits:

e469434 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-ui-fix
dbddf75 [Tathagata Das] Style fix.
66df1a5 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-ui-fix
ad98bc9 [Tathagata Das] Refactored streaming listener to use ReceiverInfo.
d7f849c [Tathagata Das] Revert "Moved BatchInfo from streaming.scheduler to streaming.ui"
5c80919 [Tathagata Das] Moved BatchInfo from streaming.scheduler to streaming.ui
da244f6 [Tathagata Das] Fixed socket receiver as well as made receiver state and error visible in the streamign UI.
parent 968c0187
No related branches found
No related tags found
No related merge requests found
Showing
with 217 additions and 103 deletions
......@@ -122,7 +122,7 @@ private[spark] object UIUtils extends Logging {
}
}
if (unit.isEmpty) {
"%d".formatLocal(Locale.US, value)
"%d".formatLocal(Locale.US, value.toInt)
} else {
"%.1f%s".formatLocal(Locale.US, value, unit)
}
......
......@@ -50,49 +50,42 @@ class SocketReceiver[T: ClassTag](
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
var socket: Socket = null
var receivingThread: Thread = null
def onStart() {
receivingThread = new Thread("Socket Receiver") {
override def run() {
connect()
receive()
}
}
receivingThread.start()
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
def onStop() {
if (socket != null) {
socket.close()
}
socket = null
if (receivingThread != null) {
receivingThread.join()
}
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
def connect() {
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
} catch {
case e: Exception =>
restart("Could not connect to " + host + ":" + port, e)
}
}
def receive() {
try {
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
logInfo("Stopped receiving")
restart("Retrying connecting to " + host + ":" + port)
} catch {
case e: Exception =>
restart("Error receiving data from socket", e)
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
restart("Error receiving data", t)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
}
......
......@@ -28,8 +28,13 @@ import akka.actor.SupervisorStrategy.{Escalate, Restart}
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.StorageLevel
import java.nio.ByteBuffer
import org.apache.spark.annotation.DeveloperApi
/** A helper with set of defaults for supervisor strategy */
/**
* :: DeveloperApi ::
* A helper with set of defaults for supervisor strategy
*/
@DeveloperApi
object ActorSupervisorStrategy {
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
......@@ -40,6 +45,7 @@ object ActorSupervisorStrategy {
}
/**
* :: DeveloperApi ::
* A receiver trait to be mixed in with your Actor to gain access to
* the API for pushing received data into Spark Streaming for being processed.
*
......@@ -61,6 +67,7 @@ object ActorSupervisorStrategy {
* to ensure the type safety, i.e parametrized type of push block and InputDStream
* should be same.
*/
@DeveloperApi
trait ActorHelper {
self: Actor => // to ensure that this can be added to Actor classes only
......@@ -92,10 +99,12 @@ trait ActorHelper {
}
/**
* :: DeveloperApi ::
* Statistics for querying the supervisor about state of workers. Used in
* conjunction with `StreamingContext.actorStream` and
* [[org.apache.spark.streaming.receiver.ActorHelper]].
*/
@DeveloperApi
case class Statistics(numberOfMsgs: Int,
numberOfWorkers: Int,
numberOfHiccups: Int,
......@@ -188,4 +197,3 @@ private[streaming] class ActorReceiver[T: ClassTag](
supervisor ! PoisonPill
}
}
......@@ -23,8 +23,10 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Abstract class of a receiver that can be run on worker nodes to receive external data. A
* custom receiver can be defined by defining the functions onStart() and onStop(). onStart()
* should define the setup steps necessary to start receiving data,
......@@ -51,6 +53,7 @@ import org.apache.spark.storage.StorageLevel
* }
* }}}
*/
@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
/**
......@@ -198,7 +201,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
/** Check if receiver has been marked for stopping. */
def isStopped(): Boolean = {
!executor.isReceiverStarted()
executor.isReceiverStopped()
}
/** Get unique identifier of this receiver. */
......
......@@ -18,6 +18,6 @@
package org.apache.spark.streaming.receiver
/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait NetworkReceiverMessage
private[streaming] object StopReceiver extends NetworkReceiverMessage
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage
......@@ -88,15 +88,29 @@ private[streaming] abstract class ReceiverSupervisor(
/** Report errors. */
def reportError(message: String, throwable: Throwable)
/** Start the executor */
/** Called when supervisor is started */
protected def onStart() { }
/** Called when supervisor is stopped */
protected def onStop(message: String, error: Option[Throwable]) { }
/** Called when receiver is started */
protected def onReceiverStart() { }
/** Called when receiver is stopped */
protected def onReceiverStop(message: String, error: Option[Throwable]) { }
/** Start the supervisor */
def start() {
onStart()
startReceiver()
}
/** Mark the executor and the receiver for stopping */
/** Mark the supervisor and the receiver for stopping */
def stop(message: String, error: Option[Throwable]) {
stoppingError = error.orNull
stopReceiver(message, error)
onStop(message, error)
stopLatch.countDown()
}
......@@ -104,6 +118,8 @@ private[streaming] abstract class ReceiverSupervisor(
def startReceiver(): Unit = synchronized {
try {
logInfo("Starting receiver")
receiver.onStart()
logInfo("Called receiver onStart")
onReceiverStart()
receiverState = Started
} catch {
......@@ -115,7 +131,10 @@ private[streaming] abstract class ReceiverSupervisor(
/** Stop receiver */
def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
try {
logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse(""))
receiverState = Stopped
receiver.onStop()
logInfo("Called receiver onStop")
onReceiverStop(message, error)
} catch {
case t: Throwable =>
......@@ -130,41 +149,32 @@ private[streaming] abstract class ReceiverSupervisor(
/** Restart receiver with delay */
def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null))
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
future {
Future {
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null))
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
logDebug("Sleeping for " + delay)
Thread.sleep(delay)
logDebug("Starting receiver again")
logInfo("Starting receiver again")
startReceiver()
logInfo("Receiver started again")
}
}
/** Called when the receiver needs to be started */
protected def onReceiverStart(): Unit = synchronized {
// Call user-defined onStart()
logInfo("Calling receiver onStart")
receiver.onStart()
logInfo("Called receiver onStart")
}
/** Called when the receiver needs to be stopped */
protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized {
// Call user-defined onStop()
logInfo("Calling receiver onStop")
receiver.onStop()
logInfo("Called receiver onStop")
}
/** Check if receiver has been marked for stopping */
def isReceiverStarted() = {
logDebug("state = " + receiverState)
receiverState == Started
}
/** Wait the thread until the executor is stopped */
/** Check if receiver has been marked for stopping */
def isReceiverStopped() = {
logDebug("state = " + receiverState)
receiverState == Stopped
}
/** Wait the thread until the supervisor is stopped */
def awaitTermination() {
stopLatch.await()
logInfo("Waiting for executor stop is over")
......
......@@ -79,6 +79,8 @@ private[streaming] class ReceiverSupervisorImpl(
logInfo("Received stop signal")
stop("Stopped by driver", None)
}
def ref = self
}), "Receiver-" + streamId + "-" + System.currentTimeMillis())
/** Unique block ids if one wants to add blocks directly */
......@@ -154,14 +156,23 @@ private[streaming] class ReceiverSupervisorImpl(
logWarning("Reported error " + message + " - " + error)
}
override def onReceiverStart() {
override protected def onStart() {
blockGenerator.start()
super.onReceiverStart()
}
override def onReceiverStop(message: String, error: Option[Throwable]) {
super.onReceiverStop(message, error)
override protected def onStop(message: String, error: Option[Throwable]) {
blockGenerator.stop()
env.actorSystem.stop(actor)
}
override protected def onReceiverStart() {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
val future = trackerActor.ask(msg)(askTimeout)
Await.result(future, askTimeout)
}
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
val future = trackerActor.ask(
......@@ -170,11 +181,6 @@ private[streaming] class ReceiverSupervisorImpl(
logInfo("Stopped receiver " + streamId)
}
override def stop(message: String, error: Option[Throwable]) {
super.stop(message, error)
env.actorSystem.stop(actor)
}
/** Generate new block ID */
private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement)
}
......@@ -18,8 +18,10 @@
package org.apache.spark.streaming.scheduler
import org.apache.spark.streaming.Time
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Class having information on completed batches.
* @param batchTime Time of the batch
* @param submissionTime Clock time of when jobs of this batch was submitted to
......@@ -27,6 +29,7 @@ import org.apache.spark.streaming.Time
* @param processingStartTime Clock time of when the first job of this batch started processing
* @param processingEndTime Clock time of when the last job of this batch finished processing
*/
@DeveloperApi
case class BatchInfo(
batchTime: Time,
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.scheduler
import akka.actor.ActorRef
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Class having information about a receiver
*/
@DeveloperApi
case class ReceiverInfo(
streamId: Int,
name: String,
private[streaming] val actor: ActorRef,
active: Boolean,
location: String,
lastErrorMessage: String = "",
lastError: String = ""
) {
}
......@@ -28,13 +28,8 @@ import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.util.AkkaUtils
/** Information about receiver */
case class ReceiverInfo(streamId: Int, typ: String, location: String) {
override def toString = s"$typ-$streamId"
}
/** Information about blocks received by the receiver */
case class ReceivedBlockInfo(
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
numRecords: Long,
......@@ -69,7 +64,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
val receiverInputStreams = ssc.graph.getReceiverInputStreams()
val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
val receiverExecutor = new ReceiverLauncher()
val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef]
val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo]
val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
val timeout = AkkaUtils.askTimeout(ssc.conf)
......@@ -129,17 +124,23 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
if (!receiverInputStreamMap.contains(streamId)) {
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
ReceiverInfo(streamId, typ, host)
))
receiverInfo(streamId) = ReceiverInfo(
streamId, s"${typ}-${streamId}", receiverActor, true, host)
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address)
}
/** Deregister a receiver */
def deregisterReceiver(streamId: Int, message: String, error: String) {
receiverInfo -= streamId
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId, message, error))
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error)
case None =>
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
}
receiverInfo(streamId) = newReceiverInfo
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {
......@@ -157,7 +158,15 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
/** Report error sent by a receiver */
def reportError(streamId: Int, message: String, error: String) {
ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(streamId, message, error))
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
oldInfo.copy(lastErrorMessage = message, lastError = error)
case None =>
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
}
receiverInfo(streamId) = newReceiverInfo
ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
} else {
......@@ -270,7 +279,8 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
/** Stops the receivers. */
private def stopReceivers() {
// Signal the receivers to stop
receiverInfo.values.foreach(_ ! StopReceiver)
receiverInfo.values.flatMap { info => Option(info.actor)}
.foreach { _ ! StopReceiver }
logInfo("Sent stop signal to all " + receiverInfo.size + " receivers")
}
}
......
......@@ -20,28 +20,45 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable.Queue
import org.apache.spark.util.Distribution
import org.apache.spark.annotation.DeveloperApi
/** Base trait for events related to StreamingListener */
/**
* :: DeveloperApi ::
* Base trait for events related to StreamingListener
*/
@DeveloperApi
sealed trait StreamingListenerEvent
@DeveloperApi
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
@DeveloperApi
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
@DeveloperApi
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
@DeveloperApi
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
case class StreamingListenerReceiverError(streamId: Int, message: String, error: String)
@DeveloperApi
case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
case class StreamingListenerReceiverStopped(streamId: Int, message: String, error: String)
@DeveloperApi
case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
/**
* :: DeveloperApi ::
* A listener interface for receiving information about an ongoing streaming
* computation.
*/
@DeveloperApi
trait StreamingListener {
/** Called when a receiver has been started */
......@@ -65,9 +82,11 @@ trait StreamingListener {
/**
* :: DeveloperApi ::
* A simple StreamingListener that logs summary statistics across Spark Streaming batches
* @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
*/
@DeveloperApi
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
// Queue containing latest completed batches
val batchInfos = new Queue[BatchInfo]()
......
......@@ -23,9 +23,9 @@ import scala.collection.mutable.{Queue, HashMap}
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.ReceiverInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
import org.apache.spark.Logging
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
......@@ -40,9 +40,21 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
val batchDuration = ssc.graph.batchDuration.milliseconds
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
synchronized {
receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
receiverInfos(receiverStarted.receiverInfo.streamId) = receiverStarted.receiverInfo
}
}
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
synchronized {
receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo
}
}
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
synchronized {
receiverInfos(receiverStopped.receiverInfo.streamId) = receiverStopped.receiverInfo
}
}
......
......@@ -78,25 +78,33 @@ private[ui] class StreamingPage(parent: StreamingTab)
val table = if (receivedRecordDistributions.size > 0) {
val headerRow = Seq(
"Receiver",
"Status",
"Location",
"Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]",
"Minimum rate\n[records/sec]",
"25th percentile rate\n[records/sec]",
"Median rate\n[records/sec]",
"75th percentile rate\n[records/sec]",
"Maximum rate\n[records/sec]"
"Maximum rate\n[records/sec]",
"Last Error"
)
val dataRows = (0 until listener.numReceivers).map { receiverId =>
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
val receiverActive = receiverInfo.map { info =>
if (info.active) "ACTIVE" else "INACTIVE"
}.getOrElse(emptyCell)
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId))
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
d.getQuantiles().map(r => formatNumber(r.toLong))
d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong))
}.getOrElse {
Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
}
Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats
val receiverLastError = listener.receiverInfo(receiverId).map { info =>
val msg = s"${info.lastErrorMessage} - ${info.lastError}"
if (msg.size > 100) msg.take(97) + "..." else msg
}.getOrElse(emptyCell)
Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++
receivedRecordStats ++ Seq(receiverLastError)
}
Some(listingTable(headerRow, dataRows))
} else {
......
......@@ -94,9 +94,13 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
// Verify restarting actually stops and starts the receiver
receiver.restart("restarting", null, 100)
assert(receiver.isStopped)
assert(receiver.onStopCalled)
eventually(timeout(50 millis), interval(10 millis)) {
// receiver will be stopped async
assert(receiver.isStopped)
assert(receiver.onStopCalled)
}
eventually(timeout(1000 millis), interval(100 millis)) {
// receiver will be started async
assert(receiver.onStartCalled)
assert(executor.isReceiverStarted)
assert(receiver.isStarted)
......
......@@ -66,7 +66,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
test("receiver info reporting") {
val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver)
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count)
val collector = new ReceiverInfoCollector
......@@ -75,8 +75,8 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
ssc.start()
try {
eventually(timeout(1000 millis), interval(20 millis)) {
collector.startedReceiverInfo should have size 1
collector.startedReceiverInfo(0).streamId should equal (0)
collector.startedReceiverStreamIds.size should be >= 1
collector.startedReceiverStreamIds(0) should equal (0)
collector.stoppedReceiverStreamIds should have size 1
collector.stoppedReceiverStreamIds(0) should equal (0)
collector.receiverErrors should have size 1
......@@ -108,20 +108,21 @@ class BatchInfoCollector extends StreamingListener {
/** Listener that collects information on processed batches */
class ReceiverInfoCollector extends StreamingListener {
val startedReceiverInfo = new ArrayBuffer[ReceiverInfo]
val startedReceiverStreamIds = new ArrayBuffer[Int]
val stoppedReceiverStreamIds = new ArrayBuffer[Int]()
val receiverErrors = new ArrayBuffer[(Int, String, String)]()
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
startedReceiverInfo += receiverStarted.receiverInfo
startedReceiverStreamIds += receiverStarted.receiverInfo.streamId
}
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
stoppedReceiverStreamIds += receiverStopped.streamId
stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId
}
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error))
receiverErrors += ((receiverError.receiverInfo.streamId,
receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment