From dc3ee6b6122229cd99a133baf10a46dac2f7e9e2 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Mon, 23 Dec 2013 11:30:42 -0800
Subject: [PATCH] Added comments to BatchInfo and JobSet, based on Patrick's
 comment on PR 277.

---
 .../spark/streaming/scheduler/BatchInfo.scala | 19 +++++++++++++++++++
 .../spark/streaming/scheduler/JobSet.scala    | 10 +++++++---
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 88e4af59b7..e3fb07624e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -21,6 +21,12 @@ import org.apache.spark.streaming.Time
 
 /**
  * Class having information on completed batches.
+ * @param batchTime   Time of the batch
+ * @param submissionTime  Clock time of when jobs of this batch was submitted to
+ *                        the streaming scheduler queue
+ * @param processingStartTime Clock time of when the first job of this batch started processing
+ * @param processingEndTime Clock time of when the last job of this batch finished processing
+ *
  */
 case class BatchInfo(
     batchTime: Time,
@@ -29,9 +35,22 @@ case class BatchInfo(
     processingEndTime: Option[Long]
   ) {
 
+  /**
+   * Time taken for the first job of this batch to start processing from the time this batch
+   * was submitted to the streaming scheduler. Essentially, it is
+   * `processingStartTime` - `submissionTime`.
+   */
   def schedulingDelay = processingStartTime.map(_ - submissionTime)
 
+  /**
+   * Time taken for the all jobs of this batch to finish processing from the time they started
+   * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
+   */
   def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
 
+    /**
+     * Time taken for all the jobs of this batch to finish processing from the time they
+     * were submitted.  Essentially, it is `processingDelay` + `schedulingDelay`.
+     */
   def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index cf7431a8a3..57268674ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -27,9 +27,9 @@ private[streaming]
 case class JobSet(time: Time, jobs: Seq[Job]) {
 
   private val incompleteJobs = new HashSet[Job]()
-  var submissionTime = System.currentTimeMillis()
-  var processingStartTime = -1L
-  var processingEndTime = -1L
+  var submissionTime = System.currentTimeMillis() // when this jobset was submitted
+  var processingStartTime = -1L // when the first job of this jobset started processing
+  var processingEndTime = -1L // when the last job of this jobset finished processing
 
   jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
   incompleteJobs ++= jobs
@@ -47,8 +47,12 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
 
   def hasCompleted() = incompleteJobs.isEmpty
 
+  // Time taken to process all the jobs from the time they started processing
+  // (i.e. not including the time they wait in the streaming scheduler queue)
   def processingDelay = processingEndTime - processingStartTime
 
+  // Time taken to process all the jobs from the time they were submitted
+  // (i.e. including the time they wait in the streaming scheduler queue)
   def totalDelay = {
     processingEndTime - time.milliseconds
   }
-- 
GitLab