diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index c7262ea97200fbad69a06e149ce86ea89435bd47..e42df5dd61c702a799534c337be008d5968bcde9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec( val encSchemaAttribs = stateEncoder.schema.toAttributes if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs } + // Get the serializer for the state, taking into account whether we need to save timestamps + private val stateSerializer = { + val encoderSerializer = stateEncoder.namedExpressions + if (isTimeoutEnabled) { + encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP) + } else { + encoderSerializer + } + } + // Get the deserializer for the state. Note that this must be done in the driver, as + // resolving and binding of deserializer expressions to the encoded type can be safely done + // only in the driver. + private val stateDeserializer = stateEncoder.resolveAndBind().deserializer + /** Distribute by grouping attributes */ override def requiredChildDistribution: Seq[Distribution] = @@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec( ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes) private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType) - // Converter for translating state rows to Java objects + // Converters for translating state between rows and Java objects private val getStateObjFromRow = ObjectOperator.deserializeRowToObject( - stateEncoder.resolveAndBind().deserializer, stateAttributes) - - // Converter for translating state Java objects to rows - private val stateSerializer = { - val encoderSerializer = stateEncoder.namedExpressions - if (isTimeoutEnabled) { - encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP) - } else { - encoderSerializer - } - } + stateDeserializer, stateAttributes) private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer) // Index of the additional metadata fields in the state row diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index f705da3d6a709f2112045ce72e2f78848afdf4ed..171877abe6e92f5df8392476c3d0191c3daf5547 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -909,7 +909,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } - test("max files per trigger - incorrect values") { + testQuietly("max files per trigger - incorrect values") { val testTable = "maxFilesPerTrigger_test" withTable(testTable) { withTempDir { case src => @@ -1326,7 +1326,7 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest { import testImplicits._ - test("file source stress test") { + testQuietly("file source stress test") { val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index a00a1a582a971b6afd381ce27377956811bb88dc..c8e31e3ca2e04998a99dff79e26491c16355069b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -21,6 +21,8 @@ import java.sql.Date import java.util.concurrent.ConcurrentHashMap import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.apache.spark.SparkException import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction @@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf assertNumStateRows(total = 1, updated = 2), StopStream, - StartStream(ProcessingTime("1 second"), triggerClock = clock), - AdvanceManualClock(10 * 1000), + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), AddData(inputData, "c"), - AdvanceManualClock(1 * 1000), + AdvanceManualClock(11 * 1000), CheckLastBatch(("b", "-1"), ("c", "1")), assertNumStateRows(total = 1, updated = 2), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 32920f6dfa2232c389c761f67db9ff4b6460bb6a..388f15405e70b64397132900a85894c955d86d41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -426,7 +426,7 @@ class StreamSuite extends StreamTest { CheckAnswer((1, 2), (2, 2), (3, 2))) } - test("recover from a Spark v2.1 checkpoint") { + testQuietly("recover from a Spark v2.1 checkpoint") { var inputData: MemoryStream[Int] = null var query: DataStreamWriter[Row] = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 8cf1791336814208cc248fc715c678bffbc4355a..951ff2ca0d6845053dd2ef109afad19d481d3153 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -488,8 +488,27 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case a: AddData => try { - // Add data and get the source where it was added, and the expected offset of the - // added data. + + // If the query is running with manual clock, then wait for the stream execution + // thread to start waiting for the clock to increment. This is needed so that we + // are adding data when there is no trigger that is active. This would ensure that + // the data gets deterministically added to the next batch triggered after the manual + // clock is incremented in following AdvanceManualClock. This avoid race conditions + // between the test thread and the stream execution thread in tests using manual + // clock. + if (currentStream != null && + currentStream.triggerClock.isInstanceOf[StreamManualClock]) { + val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock] + eventually("Error while synchronizing with manual clock before adding data") { + if (currentStream.isActive) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (!currentStream.isActive) { + failTest("Query terminated while synchronizing with manual clock") + } + } + // Add data val queryToUse = Option(currentStream).orElse(Option(lastStream)) val (source, offset) = a.addData(queryToUse) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 3f41ecdb7ff680fa5dec136f7e3aca25b236a4c0..1172531fe9988e531b964ec65e2421b53616e6a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -487,7 +487,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } - test("StreamingQuery should be Serializable but cannot be used in executors") { + testQuietly("StreamingQuery should be Serializable but cannot be used in executors") { def startQuery(ds: Dataset[Int], queryName: String): StreamingQuery = { ds.writeStream .queryName(queryName)