From 70b2bf717d367d598c5a238d569d62c777e63fde Mon Sep 17 00:00:00 2001 From: Michael Armbrust <michael@databricks.com> Date: Wed, 7 Dec 2016 15:36:29 -0800 Subject: [PATCH] [SPARK-18754][SS] Rename recentProgresses to recentProgress Based on an informal survey, users find this option easier to understand / remember. Author: Michael Armbrust <michael@databricks.com> Closes #16182 from marmbrus/renameRecentProgress. --- .../spark/sql/kafka010/KafkaSourceSuite.scala | 2 +- project/MimaExcludes.scala | 2 +- python/pyspark/sql/streaming.py | 6 +++--- python/pyspark/sql/tests.py | 4 ++-- .../execution/streaming/ProgressReporter.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/streaming/StreamingQuery.scala | 4 ++-- .../execution/streaming/ForeachSinkSuite.scala | 4 ++-- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../StreamingQueryListenerSuite.scala | 4 ++-- .../sql/streaming/StreamingQuerySuite.scala | 18 +++++++++--------- 11 files changed, 25 insertions(+), 25 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 0e40abac65..544fbc5ec3 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -448,7 +448,7 @@ class KafkaSourceSuite extends KafkaSourceTest { AddKafkaData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), AssertOnQuery { query => - val recordsRead = query.recentProgresses.map(_.numInputRows).sum + val recordsRead = query.recentProgress.map(_.numInputRows).sum recordsRead == 3 } ) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 82d50f9891..b215d8867d 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -91,7 +91,7 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"), diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index ee7a26d00d..9cfb3fe25c 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -114,12 +114,12 @@ class StreamingQuery(object): @property @since(2.1) - def recentProgresses(self): + def recentProgress(self): """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The number of progress updates retained for each stream is configured by Spark session - configuration `spark.sql.streaming.numRecentProgresses`. + configuration `spark.sql.streaming.numRecentProgressUpdates`. """ - return [json.loads(p.json()) for p in self._jsq.recentProgresses()] + return [json.loads(p.json()) for p in self._jsq.recentProgress()] @property @since(2.1) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 66a3490a64..50df68b144 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1116,11 +1116,11 @@ class SQLTests(ReusedPySparkTestCase): try: q.processAllAvailable() lastProgress = q.lastProgress - recentProgresses = q.recentProgresses + recentProgress = q.recentProgress status = q.status self.assertEqual(lastProgress['name'], q.name) self.assertEqual(lastProgress['id'], q.id) - self.assertTrue(any(p == lastProgress for p in recentProgresses)) + self.assertTrue(any(p == lastProgress for p in recentProgress)) self.assertTrue( "message" in status and "isDataAvailable" in status and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 12d0c1e9b4..40e3151337 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -94,7 +94,7 @@ trait ProgressReporter extends Logging { def status: StreamingQueryStatus = currentStatus /** Returns an array containing the most recent query progress updates. */ - def recentProgresses: Array[StreamingQueryProgress] = progressBuffer.synchronized { + def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized { progressBuffer.toArray } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5b45df69e6..91f3fe0fe9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -617,7 +617,7 @@ object SQLConf { .createWithDefault(false) val STREAMING_PROGRESS_RETENTION = - SQLConfigBuilder("spark.sql.streaming.numRecentProgresses") + SQLConfigBuilder("spark.sql.streaming.numRecentProgressUpdates") .doc("The number of progress updates to retain for a streaming query") .intConf .createWithDefault(100) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 1794e75462..596bd90140 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -87,11 +87,11 @@ trait StreamingQuery { /** * Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. * The number of progress updates retained for each stream is configured by Spark session - * configuration `spark.sql.streaming.numRecentProgresses`. + * configuration `spark.sql.streaming.numRecentProgressUpdates`. * * @since 2.1.0 */ - def recentProgresses: Array[StreamingQueryProgress] + def recentProgress: Array[StreamingQueryProgress] /** * Returns the most recent [[StreamingQueryProgress]] update of this streaming query. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index 4a3eeb70b1..9137d650e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -263,9 +263,9 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf try { inputData.addData(10, 11, 12) query.processAllAvailable() - val recentProgress = query.recentProgresses.filter(_.numInputRows != 0).headOption + val recentProgress = query.recentProgress.filter(_.numInputRows != 0).headOption assert(recentProgress.isDefined && recentProgress.get.numInputRows === 3, - s"recentProgresses[${query.recentProgresses.toList}] doesn't contain correct metrics") + s"recentProgress[${query.recentProgress.toList}] doesn't contain correct metrics") } finally { query.stop() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index ff1f3e26f1..7b6fe83b9a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1006,7 +1006,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { AddTextFileData("100", src, tmp), CheckAnswer("100"), AssertOnQuery { query => - val actualProgress = query.recentProgresses + val actualProgress = query.recentProgress .find(_.numInputRows > 0) .getOrElse(sys.error("Could not find records with data.")) assert(actualProgress.numInputRows === 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 1cd503c6de..b78d1353e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -237,9 +237,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } true } - // `recentProgresses` should not receive too many no data events + // `recentProgress` should not receive too many no data events actions += AssertOnQuery { q => - q.recentProgresses.size > 1 && q.recentProgresses.size <= 11 + q.recentProgress.size > 1 && q.recentProgress.size <= 11 } testStream(input.toDS)(actions: _*) spark.sparkContext.listenerBus.waitUntilEmpty(10000) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 55dd1a5d51..7be2f21691 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -152,7 +152,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { ) } - testQuietly("status, lastProgress, and recentProgresses") { + testQuietly("status, lastProgress, and recentProgress") { import StreamingQuerySuite._ clock = new StreamManualClock @@ -201,7 +201,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while offset is being fetched AddData(inputData, 1, 2), @@ -210,7 +210,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message.startsWith("Getting offsets from")), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch is being fetched AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch @@ -218,14 +218,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch is being processed AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), - AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch processing has completed AdvanceManualClock(500), // time = 1100 to unblock job @@ -236,8 +236,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.message === "Waiting for next trigger"), AssertOnQuery { query => assert(query.lastProgress != null) - assert(query.recentProgresses.exists(_.numInputRows > 0)) - assert(query.recentProgresses.last.eq(query.lastProgress)) + assert(query.recentProgress.exists(_.numInputRows > 0)) + assert(query.recentProgress.last.eq(query.lastProgress)) val progress = query.lastProgress assert(progress.id === query.id) @@ -274,7 +274,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), AssertOnQuery { query => - assert(query.recentProgresses.last.eq(query.lastProgress)) + assert(query.recentProgress.last.eq(query.lastProgress)) assert(query.lastProgress.batchId === 1) assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818) true @@ -408,7 +408,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { try { val q = streamingDF.writeStream.format("memory").queryName("test").start() q.processAllAvailable() - q.recentProgresses.head + q.recentProgress.head } finally { spark.streams.active.map(_.stop()) } -- GitLab