Skip to content
Snippets Groups Projects
Commit 9533f539 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-6005][TESTS] Fix flaky test: o.a.s.streaming.kafka.DirectKafkaStreamSuite.offset recovery

## What changes were proposed in this pull request?

Because this test extracts data from `DStream.generatedRDDs` before stopping, it may get data before checkpointing. Then after recovering from the checkpoint, `recoveredOffsetRanges` may contain something not in `offsetRangesBeforeStop`, which will fail the test. Adding `Thread.sleep(1000)` before `ssc.stop()` will reproduce this failure.

This PR just moves the logic of `offsetRangesBeforeStop` (also renamed to `offsetRangesAfterStop`) after `ssc.stop()` to fix the flaky test.

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12903 from zsxwing/SPARK-6005.
parent 603c4f8e
No related branches found
No related tags found
No related merge requests found
...@@ -281,14 +281,20 @@ class DirectKafkaStreamSuite ...@@ -281,14 +281,20 @@ class DirectKafkaStreamSuite
sendDataAndWaitForReceive(i) sendDataAndWaitForReceive(i)
} }
ssc.stop()
// Verify that offset ranges were generated // Verify that offset ranges were generated
val offsetRangesBeforeStop = getOffsetRanges(kafkaStream) // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should
assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated") // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before
// stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will
// contain something not in "offsetRangesAfterStop".
val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
assert( assert(
offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 }, offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
"starting offset not zero" "starting offset not zero"
) )
ssc.stop()
logInfo("====== RESTARTING ========") logInfo("====== RESTARTING ========")
// Recover context from checkpoints // Recover context from checkpoints
...@@ -298,12 +304,14 @@ class DirectKafkaStreamSuite ...@@ -298,12 +304,14 @@ class DirectKafkaStreamSuite
// Verify offset ranges have been recovered // Verify offset ranges have been recovered
val recoveredOffsetRanges = getOffsetRanges(recoveredStream) val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered") assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) } val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
assert( assert(
recoveredOffsetRanges.forall { or => recoveredOffsetRanges.forall { or =>
earlierOffsetRangesAsSets.contains((or._1, or._2.toSet)) earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
}, },
"Recovered ranges are not the same as the ones generated" "Recovered ranges are not the same as the ones generated\n" +
s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
) )
// Restart context, give more data and verify the total at the end // Restart context, give more data and verify the total at the end
// If the total is write that means each records has been received only once // If the total is write that means each records has been received only once
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment