From 895baf8f77e630ce32b0e25b00bf5ee45d17398f Mon Sep 17 00:00:00 2001
From: zsxwing <zsxwing@gmail.com>
Date: Wed, 20 May 2015 19:56:01 -0700
Subject: [PATCH] [SPARK-7777] [STREAMING] Fix the flaky test in
 org.apache.spark.streaming.BasicOperationsSuite

Just added a guard to make sure a batch has completed before moving to the next batch.

Author: zsxwing <zsxwing@gmail.com>

Closes #6306 from zsxwing/SPARK-7777 and squashes the following commits:

ecee529 [zsxwing] Fix the failure message
58634fe [zsxwing] Fix the flaky test in org.apache.spark.streaming.BasicOperationsSuite
---
 .../org/apache/spark/streaming/BasicOperationsSuite.scala  | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 87bc20f79c..f269cb74e0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -557,6 +557,9 @@ class BasicOperationsSuite extends TestSuiteBase {
     withTestServer(new TestServer()) { testServer =>
       withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
         testServer.start()
+
+        val batchCounter = new BatchCounter(ssc)
+
         // Set up the streaming context and input streams
         val networkStream =
           ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
@@ -587,7 +590,11 @@ class BasicOperationsSuite extends TestSuiteBase {
         for (i <- 0 until input.size) {
           testServer.send(input(i).toString + "\n")
           Thread.sleep(200)
+          val numCompletedBatches = batchCounter.getNumCompletedBatches
           clock.advance(batchDuration.milliseconds)
+          if (!batchCounter.waitUntilBatchesCompleted(numCompletedBatches + 1, 5000)) {
+            fail("Batch took more than 5 seconds to complete")
+          }
           collectRddInfo()
         }
 
-- 
GitLab