From 567a50acfb0ae26bd430c290348886d494963696 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Fri, 31 Mar 2017 10:58:43 -0700
Subject: [PATCH] [SPARK-20165][SS] Resolve state encoder's deserializer in
 driver in FlatMapGroupsWithStateExec

## What changes were proposed in this pull request?

- Encoder's deserializer must be resolved at the driver where the class is defined. Otherwise there are corner cases using nested classes where resolving at the executor can fail.

- Fixed flaky test related to processing time timeout. The flakiness is caused because the test thread (that adds data to memory source) has a race condition with the streaming query thread. When testing the manual clock, the goal is to add data and increment clock together atomically, such that a trigger sees new data AND updated clock simultaneously (both or none). This fix adds additional synchronization in when adding data; it makes sure that the streaming query thread is waiting on the manual clock to be incremented (so no batch is currently running) before adding data.

- Added`testQuietly` on some tests that generate a lot of error logs.

## How was this patch tested?
Multiple runs on existing unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17488 from tdas/SPARK-20165.
---
 .../FlatMapGroupsWithStateExec.scala          | 28 +++++++++++--------
 .../sql/streaming/FileStreamSourceSuite.scala |  4 +--
 .../FlatMapGroupsWithStateSuite.scala         |  7 +++--
 .../spark/sql/streaming/StreamSuite.scala     |  2 +-
 .../spark/sql/streaming/StreamTest.scala      | 23 +++++++++++++--
 .../sql/streaming/StreamingQuerySuite.scala   |  2 +-
 6 files changed, 45 insertions(+), 21 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index c7262ea972..e42df5dd61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec(
     val encSchemaAttribs = stateEncoder.schema.toAttributes
     if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
   }
+  // Get the serializer for the state, taking into account whether we need to save timestamps
+  private val stateSerializer = {
+    val encoderSerializer = stateEncoder.namedExpressions
+    if (isTimeoutEnabled) {
+      encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
+    } else {
+      encoderSerializer
+    }
+  }
+  // Get the deserializer for the state. Note that this must be done in the driver, as
+  // resolving and binding of deserializer expressions to the encoded type can be safely done
+  // only in the driver.
+  private val stateDeserializer = stateEncoder.resolveAndBind().deserializer
+
 
   /** Distribute by grouping attributes */
   override def requiredChildDistribution: Seq[Distribution] =
@@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec(
       ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
     private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)
 
-    // Converter for translating state rows to Java objects
+    // Converters for translating state between rows and Java objects
     private val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
-      stateEncoder.resolveAndBind().deserializer, stateAttributes)
-
-    // Converter for translating state Java objects to rows
-    private val stateSerializer = {
-      val encoderSerializer = stateEncoder.namedExpressions
-      if (isTimeoutEnabled) {
-        encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
-      } else {
-        encoderSerializer
-      }
-    }
+      stateDeserializer, stateAttributes)
     private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)
 
     // Index of the additional metadata fields in the state row
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index f705da3d6a..171877abe6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -909,7 +909,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger - incorrect values") {
+  testQuietly("max files per trigger - incorrect values") {
     val testTable = "maxFilesPerTrigger_test"
     withTable(testTable) {
       withTempDir { case src =>
@@ -1326,7 +1326,7 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
 
   import testImplicits._
 
-  test("file source stress test") {
+  testQuietly("file source stress test") {
     val src = Utils.createTempDir(namePrefix = "streaming.src")
     val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index a00a1a582a..c8e31e3ca2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -21,6 +21,8 @@ import java.sql.Date
 import java.util.concurrent.ConcurrentHashMap
 
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
 
 import org.apache.spark.SparkException
 import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
@@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
       assertNumStateRows(total = 1, updated = 2),
 
       StopStream,
-      StartStream(ProcessingTime("1 second"), triggerClock = clock),
-      AdvanceManualClock(10 * 1000),
+      StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
 
       AddData(inputData, "c"),
-      AdvanceManualClock(1 * 1000),
+      AdvanceManualClock(11 * 1000),
       CheckLastBatch(("b", "-1"), ("c", "1")),
       assertNumStateRows(total = 1, updated = 2),
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 32920f6dfa..388f15405e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -426,7 +426,7 @@ class StreamSuite extends StreamTest {
       CheckAnswer((1, 2), (2, 2), (3, 2)))
   }
 
-  test("recover from a Spark v2.1 checkpoint") {
+  testQuietly("recover from a Spark v2.1 checkpoint") {
     var inputData: MemoryStream[Int] = null
     var query: DataStreamWriter[Row] = null
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 8cf1791336..951ff2ca0d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -488,8 +488,27 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
 
           case a: AddData =>
             try {
-              // Add data and get the source where it was added, and the expected offset of the
-              // added data.
+
+              // If the query is running with manual clock, then wait for the stream execution
+              // thread to start waiting for the clock to increment. This is needed so that we
+              // are adding data when there is no trigger that is active. This would ensure that
+              // the data gets deterministically added to the next batch triggered after the manual
+              // clock is incremented in following AdvanceManualClock. This avoid race conditions
+              // between the test thread and the stream execution thread in tests using manual
+              // clock.
+              if (currentStream != null &&
+                  currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
+                val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
+                eventually("Error while synchronizing with manual clock before adding data") {
+                  if (currentStream.isActive) {
+                    assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+                  }
+                }
+                if (!currentStream.isActive) {
+                  failTest("Query terminated while synchronizing with manual clock")
+                }
+              }
+              // Add data
               val queryToUse = Option(currentStream).orElse(Option(lastStream))
               val (source, offset) = a.addData(queryToUse)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 3f41ecdb7f..1172531fe9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -487,7 +487,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     }
   }
 
-  test("StreamingQuery should be Serializable but cannot be used in executors") {
+  testQuietly("StreamingQuery should be Serializable but cannot be used in executors") {
     def startQuery(ds: Dataset[Int], queryName: String): StreamingQuery = {
       ds.writeStream
         .queryName(queryName)
-- 
GitLab