diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 0e40abac65251bd91cc7dd4d2c0fb1585a768b90..544fbc5ec36a26339ab4a9dea432dc0e20df04fa 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -448,7 +448,7 @@ class KafkaSourceSuite extends KafkaSourceTest { AddKafkaData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), AssertOnQuery { query => - val recordsRead = query.recentProgresses.map(_.numInputRows).sum + val recordsRead = query.recentProgress.map(_.numInputRows).sum recordsRead == 3 } ) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 82d50f9891d9f8b1ce6bc03ac3d60520bf59bab7..b215d8867d2f470cdeba3cb12ef9a1653fc7f353 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -91,7 +91,7 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"), diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index ee7a26d00df4bc491e5f67abbc4213f926de6273..9cfb3fe25cdcc18d41615b9b5b059dc81576b2f2 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -114,12 +114,12 @@ class StreamingQuery(object): @property @since(2.1) - def recentProgresses(self): + def recentProgress(self): """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The number of progress updates retained for each stream is configured by Spark session - configuration `spark.sql.streaming.numRecentProgresses`. + configuration `spark.sql.streaming.numRecentProgressUpdates`. """ - return [json.loads(p.json()) for p in self._jsq.recentProgresses()] + return [json.loads(p.json()) for p in self._jsq.recentProgress()] @property @since(2.1) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 66a3490a640ba5288d0bac5482736a362461c875..50df68b14483dd0a01f973439c42ba545362a90b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1116,11 +1116,11 @@ class SQLTests(ReusedPySparkTestCase): try: q.processAllAvailable() lastProgress = q.lastProgress - recentProgresses = q.recentProgresses + recentProgress = q.recentProgress status = q.status self.assertEqual(lastProgress['name'], q.name) self.assertEqual(lastProgress['id'], q.id) - self.assertTrue(any(p == lastProgress for p in recentProgresses)) + self.assertTrue(any(p == lastProgress for p in recentProgress)) self.assertTrue( "message" in status and "isDataAvailable" in status and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 12d0c1e9b49f0ee6d46433f6a1060cc17e07c55b..40e3151337af62044d8e759cced04d97182c1f68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -94,7 +94,7 @@ trait ProgressReporter extends Logging { def status: StreamingQueryStatus = currentStatus /** Returns an array containing the most recent query progress updates. */ - def recentProgresses: Array[StreamingQueryProgress] = progressBuffer.synchronized { + def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized { progressBuffer.toArray } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5b45df69e6791423646cacaf81fb0fbdbc6918be..91f3fe0fe9549c1c65fe9b52bea50e723a5b9547 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -617,7 +617,7 @@ object SQLConf { .createWithDefault(false) val STREAMING_PROGRESS_RETENTION = - SQLConfigBuilder("spark.sql.streaming.numRecentProgresses") + SQLConfigBuilder("spark.sql.streaming.numRecentProgressUpdates") .doc("The number of progress updates to retain for a streaming query") .intConf .createWithDefault(100) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 1794e75462cfda9b4156a5c2451feb3ebf9ee00b..596bd90140cc96c78d779ba7894d69fcf004613a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -87,11 +87,11 @@ trait StreamingQuery { /** * Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. * The number of progress updates retained for each stream is configured by Spark session - * configuration `spark.sql.streaming.numRecentProgresses`. + * configuration `spark.sql.streaming.numRecentProgressUpdates`. * * @since 2.1.0 */ - def recentProgresses: Array[StreamingQueryProgress] + def recentProgress: Array[StreamingQueryProgress] /** * Returns the most recent [[StreamingQueryProgress]] update of this streaming query. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index 4a3eeb70b17025cd4ff533186a70b33d7bd87e81..9137d650e906b9d6b8f1936c01217ae332b8459e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -263,9 +263,9 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf try { inputData.addData(10, 11, 12) query.processAllAvailable() - val recentProgress = query.recentProgresses.filter(_.numInputRows != 0).headOption + val recentProgress = query.recentProgress.filter(_.numInputRows != 0).headOption assert(recentProgress.isDefined && recentProgress.get.numInputRows === 3, - s"recentProgresses[${query.recentProgresses.toList}] doesn't contain correct metrics") + s"recentProgress[${query.recentProgress.toList}] doesn't contain correct metrics") } finally { query.stop() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index ff1f3e26f1593f8724de1d1fda65b57beccd6f12..7b6fe83b9a597fe88ce9511a1c6a9d8b5fe9ebff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1006,7 +1006,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { AddTextFileData("100", src, tmp), CheckAnswer("100"), AssertOnQuery { query => - val actualProgress = query.recentProgresses + val actualProgress = query.recentProgress .find(_.numInputRows > 0) .getOrElse(sys.error("Could not find records with data.")) assert(actualProgress.numInputRows === 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 1cd503c6de6963d697dd32d164825b6b18d3ee4e..b78d1353e8dcbf4516bd25926566d4238dc9a697 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -237,9 +237,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } true } - // `recentProgresses` should not receive too many no data events + // `recentProgress` should not receive too many no data events actions += AssertOnQuery { q => - q.recentProgresses.size > 1 && q.recentProgresses.size <= 11 + q.recentProgress.size > 1 && q.recentProgress.size <= 11 } testStream(input.toDS)(actions: _*) spark.sparkContext.listenerBus.waitUntilEmpty(10000) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 55dd1a5d51e37903014e6f6be26d7c119db98cfa..7be2f216919b05e65508ae2ef57ba6efc83c14b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -152,7 +152,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { ) } - testQuietly("status, lastProgress, and recentProgresses") { + testQuietly("status, lastProgress, and recentProgress") { import StreamingQuerySuite._ clock = new StreamManualClock @@ -201,7 +201,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while offset is being fetched AddData(inputData, 1, 2), @@ -210,7 +210,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message.startsWith("Getting offsets from")), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch is being fetched AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch @@ -218,14 +218,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch is being processed AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch processing has completed AdvanceManualClock(500), // time = 1100 to unblock job @@ -236,8 +236,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.message === "Waiting for next trigger"), AssertOnQuery { query => assert(query.lastProgress != null) - assert(query.recentProgresses.exists(_.numInputRows > 0)) - assert(query.recentProgresses.last.eq(query.lastProgress)) + assert(query.recentProgress.exists(_.numInputRows > 0)) + assert(query.recentProgress.last.eq(query.lastProgress)) val progress = query.lastProgress assert(progress.id === query.id) @@ -274,7 +274,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), AssertOnQuery { query => - assert(query.recentProgresses.last.eq(query.lastProgress)) + assert(query.recentProgress.last.eq(query.lastProgress)) assert(query.lastProgress.batchId === 1) assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818) true @@ -408,7 +408,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { try { val q = streamingDF.writeStream.format("memory").queryName("test").start() q.processAllAvailable() - q.recentProgresses.head + q.recentProgress.head } finally { spark.streams.active.map(_.stop()) }