From 9533f5390a3ad7ab96a7bea01cdb6aed89503a51 Mon Sep 17 00:00:00 2001
From: Shixiong Zhu <shixiong@databricks.com>
Date: Tue, 10 May 2016 13:26:53 -0700
Subject: [PATCH] [SPARK-6005][TESTS] Fix flaky test:
 o.a.s.streaming.kafka.DirectKafkaStreamSuite.offset recovery

## What changes were proposed in this pull request?

Because this test extracts data from `DStream.generatedRDDs` before stopping, it may get data before checkpointing. Then after recovering from the checkpoint, `recoveredOffsetRanges` may contain something not in `offsetRangesBeforeStop`, which will fail the test. Adding `Thread.sleep(1000)` before `ssc.stop()` will reproduce this failure.

This PR just moves the logic of `offsetRangesBeforeStop` (also renamed to `offsetRangesAfterStop`) after `ssc.stop()` to fix the flaky test.

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12903 from zsxwing/SPARK-6005.
---
 .../kafka/DirectKafkaStreamSuite.scala        | 20 +++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index f14ff6705f..cb782d27fe 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -281,14 +281,20 @@ class DirectKafkaStreamSuite
       sendDataAndWaitForReceive(i)
     }
 
+    ssc.stop()
+
     // Verify that offset ranges were generated
-    val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
-    assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+    // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should
+    // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before
+    // stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will
+    // contain something not in "offsetRangesAfterStop".
+    val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
+    assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
     assert(
-      offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+      offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
       "starting offset not zero"
     )
-    ssc.stop()
+
     logInfo("====== RESTARTING ========")
 
     // Recover context from checkpoints
@@ -298,12 +304,14 @@ class DirectKafkaStreamSuite
     // Verify offset ranges have been recovered
     val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
     assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
-    val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) }
+    val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
     assert(
       recoveredOffsetRanges.forall { or =>
         earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
       },
-      "Recovered ranges are not the same as the ones generated"
+      "Recovered ranges are not the same as the ones generated\n" +
+        s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
+        s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
     )
     // Restart context, give more data and verify the total at the end
     // If the total is write that means each records has been received only once
-- 
GitLab