Skip to content
Snippets Groups Projects
Commit ec71b445 authored by Tathagata Das's avatar Tathagata Das
Browse files

Minor changes.

parent e93b391d
No related branches found
No related tags found
No related merge requests found
Showing
with 36 additions and 33 deletions
......@@ -91,4 +91,3 @@ private[spark] class SparkListenerBus() extends Logging {
return true
}
}
......@@ -513,7 +513,10 @@ class StreamingContext private (
graph.addOutputStream(outputStream)
}
def addListener(streamingListener: StreamingListener) {
/** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
* receiving system events related to streaming.
*/
def addStreamingListener(streamingListener: StreamingListener) {
scheduler.listenerBus.addListener(streamingListener)
}
......@@ -532,20 +535,19 @@ class StreamingContext private (
* Start the execution of the streams.
*/
def start() {
validate()
// Get the network input streams
val networkInputStreams = graph.getInputStreams().filter(s => s match {
case n: NetworkInputDStream[_] => true
case _ => false
}).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray
// Start the network input tracker (must start before receivers)
if (networkInputStreams.length > 0) {
// Start the network input tracker (must start before receivers)
networkInputTracker = new NetworkInputTracker(this, networkInputStreams)
networkInputTracker.start()
}
Thread.sleep(1000)
// Start the scheduler
......
......@@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler.StreamingListener
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
......@@ -687,6 +688,13 @@ class JavaStreamingContext(val ssc: StreamingContext) {
ssc.remember(duration)
}
/** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
* receiving system events related to streaming.
*/
def addStreamingListener(streamingListener: StreamingListener) {
ssc.addStreamingListener(streamingListener)
}
/**
* Starts the execution of the streams.
*/
......
......@@ -19,6 +19,9 @@ package org.apache.spark.streaming.scheduler
import org.apache.spark.streaming.Time
/**
* Class having information on completed batches.
*/
case class BatchInfo(
batchTime: Time,
submissionTime: Long,
......@@ -32,7 +35,3 @@ case class BatchInfo(
def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
}
......@@ -17,9 +17,11 @@
package org.apache.spark.streaming.scheduler
import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.streaming.Time
/**
* Class representing a Spark computation. It may contain multiple Spark jobs.
*/
private[streaming]
class Job(val time: Time, func: () => _) {
var id: String = _
......@@ -36,12 +38,4 @@ class Job(val time: Time, func: () => _) {
}
override def toString = id
}
/*
private[streaming]
object Job {
val id = new AtomicLong(0)
def getNewId() = id.getAndIncrement()
}
*/
}
\ No newline at end of file
......@@ -22,6 +22,10 @@ import org.apache.spark.Logging
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
/**
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
* up DStream metadata.
*/
private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
......
......@@ -23,7 +23,9 @@ import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
import scala.collection.mutable.HashSet
import org.apache.spark.streaming._
/**
* This class drives the generation of Spark jobs from the DStreams.
*/
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
......
......@@ -50,19 +50,13 @@ trait StreamingListener {
* @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
*/
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
import org.apache.spark
// Queue containing latest completed batches
val batchInfos = new Queue[BatchInfo]()
override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) {
addToQueue(batchStarted.batchInfo)
printStats()
}
def addToQueue(newPoint: BatchInfo) {
batchInfos.enqueue(newPoint)
batchInfos.enqueue(batchStarted.batchInfo)
if (batchInfos.size > numBatchInfos) batchInfos.dequeue()
printStats()
}
def printStats() {
......@@ -71,10 +65,11 @@ class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
}
def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) {
spark.scheduler.StatsReportListener.showMillisDistribution(heading, extractDistribution(getMetric))
org.apache.spark.scheduler.StatsReportListener.showMillisDistribution(
heading, extractDistribution(getMetric))
}
def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble))
}
}
\ No newline at end of file
}
......@@ -78,4 +78,4 @@ private[spark] class StreamingListenerBus() extends Logging {
}
return true
}
}
\ No newline at end of file
}
......@@ -34,7 +34,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers{
test("basic BatchInfo generation") {
val ssc = setupStreams(input, operation)
val collector = new BatchInfoCollector
ssc.addListener(collector)
ssc.addStreamingListener(collector)
runStreams(ssc, input.size, input.size)
val batchInfos = collector.batchInfos
batchInfos should have size 4
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment