Skip to content
Snippets Groups Projects
Commit 895baf8f authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-7777] [STREAMING] Fix the flaky test in org.apache.spark.streaming.BasicOperationsSuite

Just added a guard to make sure a batch has completed before moving to the next batch.

Author: zsxwing <zsxwing@gmail.com>

Closes #6306 from zsxwing/SPARK-7777 and squashes the following commits:

ecee529 [zsxwing] Fix the failure message
58634fe [zsxwing] Fix the flaky test in org.apache.spark.streaming.BasicOperationsSuite
parent a70bf06b
No related branches found
No related tags found
No related merge requests found
...@@ -557,6 +557,9 @@ class BasicOperationsSuite extends TestSuiteBase { ...@@ -557,6 +557,9 @@ class BasicOperationsSuite extends TestSuiteBase {
withTestServer(new TestServer()) { testServer => withTestServer(new TestServer()) { testServer =>
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
testServer.start() testServer.start()
val batchCounter = new BatchCounter(ssc)
// Set up the streaming context and input streams // Set up the streaming context and input streams
val networkStream = val networkStream =
ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
...@@ -587,7 +590,11 @@ class BasicOperationsSuite extends TestSuiteBase { ...@@ -587,7 +590,11 @@ class BasicOperationsSuite extends TestSuiteBase {
for (i <- 0 until input.size) { for (i <- 0 until input.size) {
testServer.send(input(i).toString + "\n") testServer.send(input(i).toString + "\n")
Thread.sleep(200) Thread.sleep(200)
val numCompletedBatches = batchCounter.getNumCompletedBatches
clock.advance(batchDuration.milliseconds) clock.advance(batchDuration.milliseconds)
if (!batchCounter.waitUntilBatchesCompleted(numCompletedBatches + 1, 5000)) {
fail("Batch took more than 5 seconds to complete")
}
collectRddInfo() collectRddInfo()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment