diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index cf987a1ab02c3ceefc03a55145be29c4ac1e3abd..a3d6a1821245b443e706ff331795d6ef6cfc022f 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -122,7 +122,7 @@ private[spark] object UIUtils extends Logging { } } if (unit.isEmpty) { - "%d".formatLocal(Locale.US, value) + "%d".formatLocal(Locale.US, value.toInt) } else { "%.1f%s".formatLocal(Locale.US, value, unit) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 1e32727eacfa3f7e4fc2ca8695c5464444e14a5a..8b72bcf20653d4df763e0be7c48380cfc47d3ffd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -50,49 +50,42 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { - var socket: Socket = null - var receivingThread: Thread = null - def onStart() { - receivingThread = new Thread("Socket Receiver") { - override def run() { - connect() - receive() - } - } - receivingThread.start() + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { + setDaemon(true) + override def run() { receive() } + }.start() } def onStop() { - if (socket != null) { - socket.close() - } - socket = null - if (receivingThread != null) { - receivingThread.join() - } + // There is nothing much to do as the thread calling receive() + // is designed to stop by itself isStopped() returns false } - def connect() { + /** Create a socket connection and receive data until receiver is stopped */ + def receive() { + var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) - } catch { - case e: Exception => - restart("Could not connect to " + host + ":" + port, e) - } - } - - def receive() { - try { logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } + logInfo("Stopped receiving") + restart("Retrying connecting to " + host + ":" + port) } catch { - case e: Exception => - restart("Error receiving data from socket", e) + case e: java.net.ConnectException => + restart("Error connecting to " + host + ":" + port, e) + case t: Throwable => + restart("Error receiving data", t) + } finally { + if (socket != null) { + socket.close() + logInfo("Closed socket to " + host + ":" + port) + } } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala index 821cf19481d44fd6bd80cc93786436ea3cfdd26e..743be58950c0936ea28c220a200fe7f2f34edae1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala @@ -28,8 +28,13 @@ import akka.actor.SupervisorStrategy.{Escalate, Restart} import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.storage.StorageLevel import java.nio.ByteBuffer +import org.apache.spark.annotation.DeveloperApi -/** A helper with set of defaults for supervisor strategy */ +/** + * :: DeveloperApi :: + * A helper with set of defaults for supervisor strategy + */ +@DeveloperApi object ActorSupervisorStrategy { val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = @@ -40,6 +45,7 @@ object ActorSupervisorStrategy { } /** + * :: DeveloperApi :: * A receiver trait to be mixed in with your Actor to gain access to * the API for pushing received data into Spark Streaming for being processed. * @@ -61,6 +67,7 @@ object ActorSupervisorStrategy { * to ensure the type safety, i.e parametrized type of push block and InputDStream * should be same. */ +@DeveloperApi trait ActorHelper { self: Actor => // to ensure that this can be added to Actor classes only @@ -92,10 +99,12 @@ trait ActorHelper { } /** + * :: DeveloperApi :: * Statistics for querying the supervisor about state of workers. Used in * conjunction with `StreamingContext.actorStream` and * [[org.apache.spark.streaming.receiver.ActorHelper]]. */ +@DeveloperApi case class Statistics(numberOfMsgs: Int, numberOfWorkers: Int, numberOfHiccups: Int, @@ -188,4 +197,3 @@ private[streaming] class ActorReceiver[T: ClassTag]( supervisor ! PoisonPill } } - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 44eecf1dd256733b9e244c0f5694315caeef7f0a..524c1b8d8ce46efa6c8a49b8907568d2b4cbe9e3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -23,8 +23,10 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.annotation.DeveloperApi /** + * :: DeveloperApi :: * Abstract class of a receiver that can be run on worker nodes to receive external data. A * custom receiver can be defined by defining the functions onStart() and onStop(). onStart() * should define the setup steps necessary to start receiving data, @@ -51,6 +53,7 @@ import org.apache.spark.storage.StorageLevel * } * }}} */ +@DeveloperApi abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable { /** @@ -198,7 +201,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable /** Check if receiver has been marked for stopping. */ def isStopped(): Boolean = { - !executor.isReceiverStarted() + executor.isReceiverStopped() } /** Get unique identifier of this receiver. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index 6ab3ca6ea5fa64f0db97f56c6941cf724286acfa..bf39d1e891caee698fb2a780aa68302d5ff7df2a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -18,6 +18,6 @@ package org.apache.spark.streaming.receiver /** Messages sent to the NetworkReceiver. */ -private[streaming] sealed trait NetworkReceiverMessage -private[streaming] object StopReceiver extends NetworkReceiverMessage +private[streaming] sealed trait ReceiverMessage +private[streaming] object StopReceiver extends ReceiverMessage diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 256b3335e49aa6923c0758a6e259a92e4618ddd5..09be3a50d2dfa233c07fb55b3504e6e662813aa8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -88,15 +88,29 @@ private[streaming] abstract class ReceiverSupervisor( /** Report errors. */ def reportError(message: String, throwable: Throwable) - /** Start the executor */ + /** Called when supervisor is started */ + protected def onStart() { } + + /** Called when supervisor is stopped */ + protected def onStop(message: String, error: Option[Throwable]) { } + + /** Called when receiver is started */ + protected def onReceiverStart() { } + + /** Called when receiver is stopped */ + protected def onReceiverStop(message: String, error: Option[Throwable]) { } + + /** Start the supervisor */ def start() { + onStart() startReceiver() } - /** Mark the executor and the receiver for stopping */ + /** Mark the supervisor and the receiver for stopping */ def stop(message: String, error: Option[Throwable]) { stoppingError = error.orNull stopReceiver(message, error) + onStop(message, error) stopLatch.countDown() } @@ -104,6 +118,8 @@ private[streaming] abstract class ReceiverSupervisor( def startReceiver(): Unit = synchronized { try { logInfo("Starting receiver") + receiver.onStart() + logInfo("Called receiver onStart") onReceiverStart() receiverState = Started } catch { @@ -115,7 +131,10 @@ private[streaming] abstract class ReceiverSupervisor( /** Stop receiver */ def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized { try { + logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse("")) receiverState = Stopped + receiver.onStop() + logInfo("Called receiver onStop") onReceiverStop(message, error) } catch { case t: Throwable => @@ -130,41 +149,32 @@ private[streaming] abstract class ReceiverSupervisor( /** Restart receiver with delay */ def restartReceiver(message: String, error: Option[Throwable], delay: Int) { - logWarning("Restarting receiver with delay " + delay + " ms: " + message, - error.getOrElse(null)) - stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) - future { + Future { + logWarning("Restarting receiver with delay " + delay + " ms: " + message, + error.getOrElse(null)) + stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) logDebug("Sleeping for " + delay) Thread.sleep(delay) - logDebug("Starting receiver again") + logInfo("Starting receiver again") startReceiver() logInfo("Receiver started again") } } - /** Called when the receiver needs to be started */ - protected def onReceiverStart(): Unit = synchronized { - // Call user-defined onStart() - logInfo("Calling receiver onStart") - receiver.onStart() - logInfo("Called receiver onStart") - } - - /** Called when the receiver needs to be stopped */ - protected def onReceiverStop(message: String, error: Option[Throwable]): Unit = synchronized { - // Call user-defined onStop() - logInfo("Calling receiver onStop") - receiver.onStop() - logInfo("Called receiver onStop") - } - /** Check if receiver has been marked for stopping */ def isReceiverStarted() = { logDebug("state = " + receiverState) receiverState == Started } - /** Wait the thread until the executor is stopped */ + /** Check if receiver has been marked for stopping */ + def isReceiverStopped() = { + logDebug("state = " + receiverState) + receiverState == Stopped + } + + + /** Wait the thread until the supervisor is stopped */ def awaitTermination() { stopLatch.await() logInfo("Waiting for executor stop is over") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 2a3521bd46ae7d3548741014e75b0c5588c3bec0..ce8316bb148912bd8020826ffce5c16a6ef50efa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -79,6 +79,8 @@ private[streaming] class ReceiverSupervisorImpl( logInfo("Received stop signal") stop("Stopped by driver", None) } + + def ref = self }), "Receiver-" + streamId + "-" + System.currentTimeMillis()) /** Unique block ids if one wants to add blocks directly */ @@ -154,14 +156,23 @@ private[streaming] class ReceiverSupervisorImpl( logWarning("Reported error " + message + " - " + error) } - override def onReceiverStart() { + override protected def onStart() { blockGenerator.start() - super.onReceiverStart() } - override def onReceiverStop(message: String, error: Option[Throwable]) { - super.onReceiverStop(message, error) + override protected def onStop(message: String, error: Option[Throwable]) { blockGenerator.stop() + env.actorSystem.stop(actor) + } + + override protected def onReceiverStart() { + val msg = RegisterReceiver( + streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor) + val future = trackerActor.ask(msg)(askTimeout) + Await.result(future, askTimeout) + } + + override protected def onReceiverStop(message: String, error: Option[Throwable]) { logInfo("Deregistering receiver " + streamId) val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") val future = trackerActor.ask( @@ -170,11 +181,6 @@ private[streaming] class ReceiverSupervisorImpl( logInfo("Stopped receiver " + streamId) } - override def stop(message: String, error: Option[Throwable]) { - super.stop(message, error) - env.actorSystem.stop(actor) - } - /** Generate new block ID */ private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 9c69a2a4e21f56b7d12be1d22f7a81c4b1c263f2..a68aecb881117437712d131b31fd4974bdd3e3d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -18,8 +18,10 @@ package org.apache.spark.streaming.scheduler import org.apache.spark.streaming.Time +import org.apache.spark.annotation.DeveloperApi /** + * :: DeveloperApi :: * Class having information on completed batches. * @param batchTime Time of the batch * @param submissionTime Clock time of when jobs of this batch was submitted to @@ -27,6 +29,7 @@ import org.apache.spark.streaming.Time * @param processingStartTime Clock time of when the first job of this batch started processing * @param processingEndTime Clock time of when the last job of this batch finished processing */ +@DeveloperApi case class BatchInfo( batchTime: Time, receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala new file mode 100644 index 0000000000000000000000000000000000000000..d7e39c528c519172952b6e48928066ebf46c82e2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import akka.actor.ActorRef +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Class having information about a receiver + */ +@DeveloperApi +case class ReceiverInfo( + streamId: Int, + name: String, + private[streaming] val actor: ActorRef, + active: Boolean, + location: String, + lastErrorMessage: String = "", + lastError: String = "" + ) { +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 557e0961d5944fefc081dfee87bd509e9b54d8aa..5307fe189d717b181075bc85010521f0947ee8b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -28,13 +28,8 @@ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver} import org.apache.spark.util.AkkaUtils -/** Information about receiver */ -case class ReceiverInfo(streamId: Int, typ: String, location: String) { - override def toString = s"$typ-$streamId" -} - /** Information about blocks received by the receiver */ -case class ReceivedBlockInfo( +private[streaming] case class ReceivedBlockInfo( streamId: Int, blockId: StreamBlockId, numRecords: Long, @@ -69,7 +64,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { val receiverInputStreams = ssc.graph.getReceiverInputStreams() val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverLauncher() - val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef] + val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo] val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]] with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]] val timeout = AkkaUtils.askTimeout(ssc.conf) @@ -129,17 +124,23 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { if (!receiverInputStreamMap.contains(streamId)) { throw new Exception("Register received for unexpected id " + streamId) } - receiverInfo += ((streamId, receiverActor)) - ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted( - ReceiverInfo(streamId, typ, host) - )) + receiverInfo(streamId) = ReceiverInfo( + streamId, s"${typ}-${streamId}", receiverActor, true, host) + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId))) logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address) } /** Deregister a receiver */ def deregisterReceiver(streamId: Int, message: String, error: String) { - receiverInfo -= streamId - ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId, message, error)) + val newReceiverInfo = receiverInfo.get(streamId) match { + case Some(oldInfo) => + oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error) + case None => + logWarning("No prior receiver info") + ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) + } + receiverInfo(streamId) = newReceiverInfo + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId))) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { @@ -157,7 +158,15 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { /** Report error sent by a receiver */ def reportError(streamId: Int, message: String, error: String) { - ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(streamId, message, error)) + val newReceiverInfo = receiverInfo.get(streamId) match { + case Some(oldInfo) => + oldInfo.copy(lastErrorMessage = message, lastError = error) + case None => + logWarning("No prior receiver info") + ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) + } + receiverInfo(streamId) = newReceiverInfo + ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId))) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" } else { @@ -270,7 +279,8 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { /** Stops the receivers. */ private def stopReceivers() { // Signal the receivers to stop - receiverInfo.values.foreach(_ ! StopReceiver) + receiverInfo.values.flatMap { info => Option(info.actor)} + .foreach { _ ! StopReceiver } logInfo("Sent stop signal to all " + receiverInfo.size + " receivers") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 9d6ec1fa335492ee2f4a9018fc3e5d2c68ec3074..ed1aa114e19d9f195df8fb00aca82f75e840e9fa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -20,28 +20,45 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable.Queue import org.apache.spark.util.Distribution +import org.apache.spark.annotation.DeveloperApi -/** Base trait for events related to StreamingListener */ +/** + * :: DeveloperApi :: + * Base trait for events related to StreamingListener + */ +@DeveloperApi sealed trait StreamingListenerEvent +@DeveloperApi case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent + +@DeveloperApi case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent + +@DeveloperApi case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent +@DeveloperApi case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent -case class StreamingListenerReceiverError(streamId: Int, message: String, error: String) + +@DeveloperApi +case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) extends StreamingListenerEvent -case class StreamingListenerReceiverStopped(streamId: Int, message: String, error: String) + +@DeveloperApi +case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent /** + * :: DeveloperApi :: * A listener interface for receiving information about an ongoing streaming * computation. */ +@DeveloperApi trait StreamingListener { /** Called when a receiver has been started */ @@ -65,9 +82,11 @@ trait StreamingListener { /** + * :: DeveloperApi :: * A simple StreamingListener that logs summary statistics across Spark Streaming batches * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10) */ +@DeveloperApi class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener { // Queue containing latest completed batches val batchInfos = new Queue[BatchInfo]() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 14c33c728bfe1a8e93aedd65f965001e08f34deb..f61069b56db5ee209b0b5b6fc176c0611d55fc16 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -23,9 +23,9 @@ import scala.collection.mutable.{Queue, HashMap} import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted import org.apache.spark.streaming.scheduler.BatchInfo -import org.apache.spark.streaming.scheduler.ReceiverInfo import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted import org.apache.spark.util.Distribution +import org.apache.spark.Logging private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) @@ -40,9 +40,21 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) val batchDuration = ssc.graph.batchDuration.milliseconds - override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = { + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { synchronized { - receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo) + receiverInfos(receiverStarted.receiverInfo.streamId) = receiverStarted.receiverInfo + } + } + + override def onReceiverError(receiverError: StreamingListenerReceiverError) { + synchronized { + receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo + } + } + + override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { + synchronized { + receiverInfos(receiverStopped.receiverInfo.streamId) = receiverStopped.receiverInfo } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 8fe1219356cdc22f2b935c3d2208bb093dc92b3f..451b23e01c99545bc031188c261ca2df5dfe30d3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -78,25 +78,33 @@ private[ui] class StreamingPage(parent: StreamingTab) val table = if (receivedRecordDistributions.size > 0) { val headerRow = Seq( "Receiver", + "Status", "Location", "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]", "Minimum rate\n[records/sec]", - "25th percentile rate\n[records/sec]", "Median rate\n[records/sec]", - "75th percentile rate\n[records/sec]", - "Maximum rate\n[records/sec]" + "Maximum rate\n[records/sec]", + "Last Error" ) val dataRows = (0 until listener.numReceivers).map { receiverId => val receiverInfo = listener.receiverInfo(receiverId) - val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId") + val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId") + val receiverActive = receiverInfo.map { info => + if (info.active) "ACTIVE" else "INACTIVE" + }.getOrElse(emptyCell) val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId)) val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles().map(r => formatNumber(r.toLong)) + d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong)) }.getOrElse { Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell) } - Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats + val receiverLastError = listener.receiverInfo(receiverId).map { info => + val msg = s"${info.lastErrorMessage} - ${info.lastError}" + if (msg.size > 100) msg.take(97) + "..." else msg + }.getOrElse(emptyCell) + Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++ + receivedRecordStats ++ Seq(receiverLastError) } Some(listingTable(headerRow, dataRows)) } else { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala index ff3619a59042da464ccf1d2d3d699c9e76fbd756..303d149d285e182d66ce83035ce90d99e58f0e81 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala @@ -94,9 +94,13 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { // Verify restarting actually stops and starts the receiver receiver.restart("restarting", null, 100) - assert(receiver.isStopped) - assert(receiver.onStopCalled) + eventually(timeout(50 millis), interval(10 millis)) { + // receiver will be stopped async + assert(receiver.isStopped) + assert(receiver.onStopCalled) + } eventually(timeout(1000 millis), interval(100 millis)) { + // receiver will be started async assert(receiver.onStartCalled) assert(executor.isReceiverStarted) assert(receiver.isStarted) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 458dd3a2b13d8348f88a4602b4b102783b447faf..ef0efa552ceaff5904ff1c4395cf6e6949ee20ae 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -66,7 +66,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { test("receiver info reporting") { val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) - val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver) + val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) inputStream.foreachRDD(_.count) val collector = new ReceiverInfoCollector @@ -75,8 +75,8 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { ssc.start() try { eventually(timeout(1000 millis), interval(20 millis)) { - collector.startedReceiverInfo should have size 1 - collector.startedReceiverInfo(0).streamId should equal (0) + collector.startedReceiverStreamIds.size should be >= 1 + collector.startedReceiverStreamIds(0) should equal (0) collector.stoppedReceiverStreamIds should have size 1 collector.stoppedReceiverStreamIds(0) should equal (0) collector.receiverErrors should have size 1 @@ -108,20 +108,21 @@ class BatchInfoCollector extends StreamingListener { /** Listener that collects information on processed batches */ class ReceiverInfoCollector extends StreamingListener { - val startedReceiverInfo = new ArrayBuffer[ReceiverInfo] + val startedReceiverStreamIds = new ArrayBuffer[Int] val stoppedReceiverStreamIds = new ArrayBuffer[Int]() val receiverErrors = new ArrayBuffer[(Int, String, String)]() override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - startedReceiverInfo += receiverStarted.receiverInfo + startedReceiverStreamIds += receiverStarted.receiverInfo.streamId } override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { - stoppedReceiverStreamIds += receiverStopped.streamId + stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId } override def onReceiverError(receiverError: StreamingListenerReceiverError) { - receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error)) + receiverErrors += ((receiverError.receiverInfo.streamId, + receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError)) } }