From edc87e18922b98be47c298cdc3daa2b049a737e9 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu <shixiong@databricks.com> Date: Wed, 7 Dec 2016 13:47:44 -0800 Subject: [PATCH] [SPARK-18588][TESTS] Fix flaky test: KafkaSourceStressForDontFailOnDataLossSuite ## What changes were proposed in this pull request? Fixed the following failures: ``` org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 3745 times over 1.0000790851666665 minutes. Last failure message: assertion failed: failOnDataLoss-0 not deleted after timeout. ``` ``` sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query query-66 terminated with exception: null at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:146) Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: null at java.util.ArrayList.addAll(ArrayList.java:577) at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:257) at org.apache.kafka.clients.Metadata.update(Metadata.java:177) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:605) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:582) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at ... ``` ## How was this patch tested? Tested in #16048 by running many times. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16109 from zsxwing/fix-kafka-flaky-test. --- .../sql/kafka010/CachedKafkaConsumer.scala | 39 ++++++++-- .../spark/sql/kafka010/KafkaSource.scala | 2 +- .../spark/sql/kafka010/KafkaSourceSuite.scala | 11 ++- .../spark/sql/kafka010/KafkaTestUtils.scala | 75 ++++++++++++------- .../spark/sql/test/SharedSQLContext.scala | 8 +- 5 files changed, 96 insertions(+), 39 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 3f438e9918..3f396a7e6b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -86,7 +86,7 @@ private[kafka010] case class CachedKafkaConsumer private( var toFetchOffset = offset while (toFetchOffset != UNKNOWN_OFFSET) { try { - return fetchData(toFetchOffset, pollTimeoutMs) + return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -159,14 +159,18 @@ private[kafka010] case class CachedKafkaConsumer private( } /** - * Get the record at `offset`. + * Get the record for the given offset if available. Otherwise it will either throw error + * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), + * or null. * * @throws OffsetOutOfRangeException if `offset` is out of range * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds. */ private def fetchData( offset: Long, - pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = { + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { // This is the first fetch, or the last pre-fetched data has been drained. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. @@ -190,10 +194,31 @@ private[kafka010] case class CachedKafkaConsumer private( } else { val record = fetchedData.next() nextOffsetInFetchedData = record.offset + 1 - // `seek` is always called before "poll". So "record.offset" must be same as "offset". - assert(record.offset == offset, - s"The fetched data has a different offset: expected $offset but was ${record.offset}") - record + // In general, Kafka uses the specified offset as the start point, and tries to fetch the next + // available offset. Hence we need to handle offset mismatch. + if (record.offset > offset) { + // This may happen when some records aged out but their offsets already got verified + if (failOnDataLoss) { + reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})") + // Never happen as "reportDataLoss" will throw an exception + null + } else { + if (record.offset >= untilOffset) { + reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)") + null + } else { + reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") + record + } + } + } else if (record.offset < offset) { + // This should not happen. If it does happen, then we probably misunderstand Kafka internal + // mechanism. + throw new IllegalStateException( + s"Tried to fetch $offset but the returned record offset was ${record.offset}") + } else { + record + } } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index d9ab4bb4f8..92ee0ed93d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource( sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt private val offsetFetchAttemptIntervalMs = - sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong + sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong private val maxOffsetsPerTrigger = sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 2d6ccb22dd..0e40abac65 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -31,11 +31,12 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ +import org.apache.spark.SparkContext import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} -import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { @@ -811,6 +812,11 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + override def createSparkSession(): TestSparkSession = { + // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic + new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + override def beforeAll(): Unit = { super.beforeAll() testUtils = new KafkaTestUtils { @@ -839,7 +845,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared } } - ignore("stress test for failOnDataLoss=false") { + test("stress test for failOnDataLoss=false") { val reader = spark .readStream .format("kafka") @@ -848,6 +854,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared .option("subscribePattern", "failOnDataLoss.*") .option("startingOffsets", "earliest") .option("failOnDataLoss", "false") + .option("fetchOffset.retryIntervalMs", "3000") val kafka = reader.load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index f43917e151..fd1689acf6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -184,7 +184,7 @@ class KafkaTestUtils extends Logging { def deleteTopic(topic: String): Unit = { val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size AdminUtils.deleteTopic(zkUtils, topic) - verifyTopicDeletion(zkUtils, topic, partitions, List(this.server)) + verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server)) } /** Add new paritions to a Kafka topic */ @@ -286,36 +286,57 @@ class KafkaTestUtils extends Logging { props } + /** Verify topic is deleted in all places, e.g, brokers, zookeeper. */ private def verifyTopicDeletion( + topic: String, + numPartitions: Int, + servers: Seq[KafkaServer]): Unit = { + val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) + + import ZkUtils._ + // wait until admin path for delete topic is deleted, signaling completion of topic deletion + assert( + !zkUtils.pathExists(getDeleteTopicPath(topic)), + s"${getDeleteTopicPath(topic)} still exists") + assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists") + // ensure that the topic-partition has been deleted from all brokers' replica managers + assert(servers.forall(server => topicAndPartitions.forall(tp => + server.replicaManager.getPartition(tp.topic, tp.partition) == None)), + s"topic $topic still exists in the replica manager") + // ensure that logs from all replicas are deleted if delete topic is marked successful + assert(servers.forall(server => topicAndPartitions.forall(tp => + server.getLogManager().getLog(tp).isEmpty)), + s"topic $topic still exists in log mananger") + // ensure that topic is removed from all cleaner offsets + assert(servers.forall(server => topicAndPartitions.forall { tp => + val checkpoints = server.getLogManager().logDirs.map { logDir => + new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read() + } + checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp)) + }), s"checkpoint for topic $topic still exists") + // ensure the topic is gone + assert( + !zkUtils.getAllTopics().contains(topic), + s"topic $topic still exists on zookeeper") + } + + /** Verify topic is deleted. Retry to delete the topic if not. */ + private def verifyTopicDeletionWithRetries( zkUtils: ZkUtils, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) { - import ZkUtils._ - val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) - def isDeleted(): Boolean = { - // wait until admin path for delete topic is deleted, signaling completion of topic deletion - val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic)) - val topicPath = !zkUtils.pathExists(getTopicPath(topic)) - // ensure that the topic-partition has been deleted from all brokers' replica managers - val replicaManager = servers.forall(server => topicAndPartitions.forall(tp => - server.replicaManager.getPartition(tp.topic, tp.partition) == None)) - // ensure that logs from all replicas are deleted if delete topic is marked successful - val logManager = servers.forall(server => topicAndPartitions.forall(tp => - server.getLogManager().getLog(tp).isEmpty)) - // ensure that topic is removed from all cleaner offsets - val cleaner = servers.forall(server => topicAndPartitions.forall { tp => - val checkpoints = server.getLogManager().logDirs.map { logDir => - new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read() - } - checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp)) - }) - // ensure the topic is gone - val deleted = !zkUtils.getAllTopics().contains(topic) - deletePath && topicPath && replicaManager && logManager && cleaner && deleted - } - eventually(timeout(60.seconds)) { - assert(isDeleted, s"$topic not deleted after timeout") + eventually(timeout(60.seconds), interval(200.millis)) { + try { + verifyTopicDeletion(topic, numPartitions, servers) + } catch { + case e: Throwable => + // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small + // chance that a topic will be recreated after deletion due to the asynchronous update. + // Hence, delete the topic and retry. + AdminUtils.deleteTopic(zkUtils, topic) + throw e + } } } @@ -331,7 +352,7 @@ class KafkaTestUtils extends Logging { case _ => false } - eventually(timeout(10.seconds)) { + eventually(timeout(60.seconds)) { assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index db24ee8b46..2239f10870 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -48,14 +48,18 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach { */ protected implicit def sqlContext: SQLContext = _spark.sqlContext + protected def createSparkSession: TestSparkSession = { + new TestSparkSession( + sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)) + } + /** * Initialize the [[TestSparkSession]]. */ protected override def beforeAll(): Unit = { SparkSession.sqlListener.set(null) if (_spark == null) { - _spark = new TestSparkSession( - sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)) + _spark = createSparkSession } // Ensure we have initialized the context before calling parent code super.beforeAll() -- GitLab