From 768b3d623c29eaf960be096845b7c421f8a3ba36 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu <shixiong@databricks.com> Date: Tue, 12 Apr 2016 17:31:47 -0700 Subject: [PATCH] [SPARK-14579][SQL] Fix a race condition in StreamExecution.processAllAvailable ## What changes were proposed in this pull request? There is a race condition in `StreamExecution.processAllAvailable`. Here is an execution order to reproduce it. | Time |Thread 1 | MicroBatchThread | |:-------------:|:-------------:|:-----:| | 1 | | `dataAvailable in constructNextBatch` returns false | | 2 | addData(newData) | | | 3 | `noNewData = false` in processAllAvailable | | | 4 | | noNewData = true | | 5 | `noNewData` is true so just return | | The root cause is that `checking dataAvailable and change noNewData to true` is not atomic. This PR puts these two actions into `synchronized` to make sure they are atomic. In addition, this PR also has the following changes: - Make `committedOffsets` and `availableOffsets` volatile to make sure they can be seen in other threads. - Copy the reference of `availableOffsets` to a local variable so that `sourceStatuses` can use a snapshot of `availableOffsets`. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12339 from zsxwing/race-condition. --- .../execution/streaming/StreamExecution.scala | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 688e051e1f..87dd27a2b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -59,12 +59,14 @@ class StreamExecution( * Tracks how much data we have processed and committed to the sink or state store from each * input source. */ + @volatile private[sql] var committedOffsets = new StreamProgress /** * Tracks the offsets that are available to be processed, but have not yet be committed to the * sink. */ + @volatile private var availableOffsets = new StreamProgress /** The current batchId or -1 if execution has not yet been initialized. */ @@ -111,7 +113,8 @@ class StreamExecution( /** Returns current status of all the sources. */ override def sourceStatuses: Array[SourceStatus] = { - sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray + val localAvailableOffsets = availableOffsets + sources.map(s => new SourceStatus(s.toString, localAvailableOffsets.get(s))).toArray } /** Returns current status of the sink. */ @@ -228,7 +231,7 @@ class StreamExecution( * Queries all of the sources to see if any new data is available. When there is new data the * batchId counter is incremented and a new log entry is written with the newest offsets. */ - private def constructNextBatch(): Boolean = { + private def constructNextBatch(): Unit = { // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // If we interrupt some thread running Shell.runCommand, we may hit this issue. // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" @@ -241,7 +244,15 @@ class StreamExecution( } availableOffsets ++= newData - if (dataAvailable) { + val hasNewData = awaitBatchLock.synchronized { + if (dataAvailable) { + true + } else { + noNewData = true + false + } + } + if (hasNewData) { // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // If we interrupt some thread running Shell.runCommand, we may hit this issue. // As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set @@ -254,15 +265,11 @@ class StreamExecution( } currentBatchId += 1 logInfo(s"Committed offsets for batch $currentBatchId.") - true } else { - noNewData = true awaitBatchLock.synchronized { // Wake up any threads that are waiting for the stream to progress. awaitBatchLock.notifyAll() } - - false } } @@ -353,7 +360,10 @@ class StreamExecution( * least the given `Offset`. This method is indented for use primarily when writing tests. */ def awaitOffset(source: Source, newOffset: Offset): Unit = { - def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset + def notDone = { + val localCommittedOffsets = committedOffsets + !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset + } while (notDone) { logInfo(s"Waiting until $newOffset at $source") @@ -365,13 +375,17 @@ class StreamExecution( /** A flag to indicate that a batch has completed with no new data available. */ @volatile private var noNewData = false - override def processAllAvailable(): Unit = { + override def processAllAvailable(): Unit = awaitBatchLock.synchronized { noNewData = false - while (!noNewData) { - awaitBatchLock.synchronized { awaitBatchLock.wait(10000) } - if (streamDeathCause != null) { throw streamDeathCause } + while (true) { + awaitBatchLock.wait(10000) + if (streamDeathCause != null) { + throw streamDeathCause + } + if (noNewData) { + return + } } - if (streamDeathCause != null) { throw streamDeathCause } } override def awaitTermination(): Unit = { -- GitLab