From a71d1364ae87aa388128da34dd0b9b02ff85e458 Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@databricks.com> Date: Wed, 1 Jun 2016 10:14:40 -0700 Subject: [PATCH] [SPARK-15686][SQL] Move user-facing streaming classes into sql.streaming ## What changes were proposed in this pull request? This patch moves all user-facing structured streaming classes into sql.streaming. As part of this, I also added some since version annotation to methods and classes that don't have them. ## How was this patch tested? Updated tests to reflect the moves. Author: Reynold Xin <rxin@databricks.com> Closes #13429 from rxin/SPARK-15686. --- python/pyspark/sql/streaming.py | 3 +- python/pyspark/sql/utils.py | 2 +- .../spark/sql/{ => streaming}/OutputMode.java | 3 +- .../spark/sql/InternalOutputModes.scala | 2 ++ .../UnsupportedOperationChecker.scala | 3 +- .../{ => streaming}/JavaOutputModeSuite.java | 2 +- .../analysis/UnsupportedOperationsSuite.scala | 3 +- .../apache/spark/sql/DataFrameWriter.scala | 1 + .../scala/org/apache/spark/sql/Dataset.scala | 1 + .../org/apache/spark/sql/SQLContext.scala | 6 ++-- .../org/apache/spark/sql/SparkSession.scala | 7 ++-- .../spark/sql/execution/SparkStrategies.scala | 3 +- .../execution/datasources/DataSource.scala | 1 + .../ContinuousQueryListenerBus.scala | 11 +++--- .../streaming/IncrementalExecution.scala | 3 +- .../execution/streaming/StreamExecution.scala | 5 +-- .../execution/streaming/TriggerExecutor.scala | 2 +- .../sql/execution/streaming/console.scala | 3 +- .../sql/execution/streaming/memory.scala | 1 + .../spark/sql/internal/SessionState.scala | 3 +- .../apache/spark/sql/sources/interfaces.scala | 1 + .../sql/{ => streaming}/ContinuousQuery.scala | 3 +- .../ContinuousQueryException.scala | 2 +- .../ContinuousQueryListener.scala | 36 ++++++++++++++----- .../ContinuousQueryManager.scala | 9 +++-- .../sql/{ => streaming}/SinkStatus.scala | 2 +- .../sql/{ => streaming}/SourceStatus.scala | 2 +- .../spark/sql/{ => streaming}/Trigger.scala | 18 ++++++++-- .../spark/sql/ProcessingTimeSuite.scala | 1 + .../ProcessingTimeExecutorSuite.scala | 2 +- .../ContinuousQueryListenerSuite.scala | 9 +++-- .../ContinuousQueryManagerSuite.scala | 5 ++- .../sql/streaming/ContinuousQuerySuite.scala | 5 ++- .../sql/streaming/FileStreamSinkSuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 4 +-- .../spark/sql/streaming/FileStressSuite.scala | 4 +-- .../spark/sql/streaming/MemorySinkSuite.scala | 2 +- .../streaming/MemorySourceStressSuite.scala | 4 +-- .../spark/sql/streaming/StreamSuite.scala | 2 +- .../sql/{ => streaming}/StreamTest.scala | 8 +++-- .../streaming/StreamingAggregationSuite.scala | 5 ++- .../test/DataFrameReaderWriterSuite.scala | 4 +-- 42 files changed, 121 insertions(+), 74 deletions(-) rename sql/catalyst/src/main/java/org/apache/spark/sql/{ => streaming}/OutputMode.java (95%) rename sql/catalyst/src/test/java/org/apache/spark/sql/{ => streaming}/JavaOutputModeSuite.java (96%) rename sql/core/src/main/scala/org/apache/spark/sql/{ => streaming}/ContinuousQuery.scala (97%) rename sql/core/src/main/scala/org/apache/spark/sql/{ => streaming}/ContinuousQueryException.scala (98%) rename sql/core/src/main/scala/org/apache/spark/sql/{util => streaming}/ContinuousQueryListener.scala (81%) rename sql/core/src/main/scala/org/apache/spark/sql/{ => streaming}/ContinuousQueryManager.scala (96%) rename sql/core/src/main/scala/org/apache/spark/sql/{ => streaming}/SinkStatus.scala (97%) rename sql/core/src/main/scala/org/apache/spark/sql/{ => streaming}/SourceStatus.scala (97%) rename sql/core/src/main/scala/org/apache/spark/sql/{ => streaming}/Trigger.scala (94%) rename sql/core/src/test/scala/org/apache/spark/sql/{util => streaming}/ContinuousQueryListenerSuite.scala (95%) rename sql/core/src/test/scala/org/apache/spark/sql/{ => streaming}/StreamTest.scala (98%) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 8238b8e7cd..cd75622ced 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -201,7 +201,8 @@ class ProcessingTime(Trigger): self.interval = interval def _to_java_trigger(self, sqlContext): - return sqlContext._sc._jvm.org.apache.spark.sql.ProcessingTime.create(self.interval) + return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create( + self.interval) def _test(): diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 8c8768f50b..9ddaf78acf 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -71,7 +71,7 @@ def capture_sql_exception(f): raise AnalysisException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '): raise ParseException(s.split(': ', 1)[1], stackTrace) - if s.startswith('org.apache.spark.sql.ContinuousQueryException: '): + if s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '): raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '): raise QueryExecutionException(s.split(': ', 1)[1], stackTrace) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java similarity index 95% rename from sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java rename to sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java index 1936d53e5e..41e2582921 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql; +package org.apache.spark.sql.streaming; import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.InternalOutputModes; /** * :: Experimental :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala index 8ef5d9a653..153f9f57fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.streaming.OutputMode + /** * Internal helper class to generate objects representing various [[OutputMode]]s, */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index f4c0347609..8373fa336d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -17,9 +17,10 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.{AnalysisException, InternalOutputModes, OutputMode} +import org.apache.spark.sql.{AnalysisException, InternalOutputModes} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.streaming.OutputMode /** * Analyzes the presence of unsupported operations in a logical plan. diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java similarity index 96% rename from sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java rename to sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java index 1764f3348d..e0a54fe30a 100644 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql; +package org.apache.spark.sql.streaming; import org.junit.Test; diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index c2e3d47450..378cca3644 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{AnalysisException, OutputMode} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.InternalOutputModes._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.IntegerType /** A dummy command for testing unsupported operations. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 291b8250c9..25678e938d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingA import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Utils /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 7be49b1749..3a6ec4595e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython +import org.apache.spark.sql.streaming.ContinuousQuery import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0dc70c0b1c..2e14c5d486 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -30,13 +30,11 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst._ -import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.streaming.ContinuousQueryManager import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ExecutionListenerManager @@ -645,7 +643,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) /** * Returns a [[ContinuousQueryManager]] that allows managing all the - * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this` context. + * [[org.apache.spark.sql.streaming.ContinuousQuery ContinuousQueries]] active on `this` context. * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index dc4b72a6fb..52bedf9dbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -25,7 +25,7 @@ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal -import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging @@ -40,8 +40,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener -import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf} +import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types.{DataType, LongType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager import org.apache.spark.util.Utils @@ -182,7 +183,7 @@ class SparkSession private( /** * :: Experimental :: * Returns a [[ContinuousQueryManager]] that allows managing all the - * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this`. + * [[ContinuousQuery ContinuousQueries]] active on `this`. * * @group basic * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index e40525287a..7e3e45e56e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming.MemoryPlan import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.ContinuousQuery private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => @@ -201,7 +202,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { /** * Used to plan aggregation queries that are computed incrementally as part of a - * [[org.apache.spark.sql.ContinuousQuery]]. Currently this rule is injected into the planner + * [[ContinuousQuery]]. Currently this rule is injected into the planner * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]] */ object StatefulAggregationStrategy extends Strategy { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 814880b0e0..93f1ad01bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{CalendarIntervalType, StructType} import org.apache.spark.util.Utils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala index b1d24b6cfc..2a1be09693 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent} -import org.apache.spark.sql.util.ContinuousQueryListener -import org.apache.spark.sql.util.ContinuousQueryListener._ +import org.apache.spark.sql.streaming.ContinuousQueryListener import org.apache.spark.util.ListenerBus /** @@ -30,7 +29,10 @@ import org.apache.spark.util.ListenerBus * dispatch them to ContinuousQueryListener. */ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus) - extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] { + extends SparkListener + with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] { + + import ContinuousQueryListener._ sparkListenerBus.addListener(this) @@ -74,7 +76,8 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus) * listener bus. */ private case class WrappedContinuousQueryListenerEvent( - streamingListenerEvent: ContinuousQueryListener.Event) extends SparkListenerEvent { + streamingListenerEvent: ContinuousQueryListener.Event) + extends SparkListenerEvent { // Do not log streaming events in event log as history server does not support these events. protected[spark] override def logEvent: Boolean = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 5c86049851..bc0e443ca7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.{InternalOutputModes, OutputMode, SparkSession} +import org.apache.spark.sql.{InternalOutputModes, SparkSession} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} +import org.apache.spark.sql.streaming.OutputMode /** * A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ab0900d7f6..16d38a2f7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -33,8 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.util.ContinuousQueryListener -import org.apache.spark.sql.util.ContinuousQueryListener._ +import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} /** @@ -54,6 +53,8 @@ class StreamExecution( val outputMode: OutputMode) extends ContinuousQuery with Logging { + import org.apache.spark.sql.streaming.ContinuousQueryListener._ + /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index 569907b369..ac510df209 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging -import org.apache.spark.sql.ProcessingTime +import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.util.{Clock, SystemClock} trait TriggerExecutor { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 391f1e54b7..2ec2a3c3c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -18,8 +18,9 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, OutputMode, SQLContext} +import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} +import org.apache.spark.sql.streaming.OutputMode class ConsoleSink(options: Map[String, String]) extends Sink with Logging { // Number of rows to display, by default 20 rows diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index e4a95e7335..4496f41615 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType object MemoryStream { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 4c7bbf04bc..b2db377ec7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.AnalyzeTableCommand import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource} +import org.apache.spark.sql.streaming.{ContinuousQuery, ContinuousQueryManager} import org.apache.spark.sql.util.ExecutionListenerManager @@ -142,7 +143,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager /** - * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s. + * Interface to start and stop [[ContinuousQuery]]s. */ lazy val continuousQueryManager: ContinuousQueryManager = { new ContinuousQueryManager(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 3d4edbb93d..d2077a07f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.streaming.{Sink, Source} +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala similarity index 97% rename from sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala rename to sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala index 4d5afe2eb5..451cfd85e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.streaming import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.SparkSession /** * :: Experimental :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala similarity index 98% rename from sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala rename to sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala index fec38629d9..5196c5a537 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.streaming import org.apache.spark.annotation.Experimental import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala similarity index 81% rename from sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala rename to sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala index ba1facf11b..6bdd513288 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala @@ -15,20 +15,22 @@ * limitations under the License. */ -package org.apache.spark.sql.util +package org.apache.spark.sql.streaming import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.ContinuousQuery -import org.apache.spark.sql.util.ContinuousQueryListener._ /** * :: Experimental :: * Interface for listening to events related to [[ContinuousQuery ContinuousQueries]]. * @note The methods are not thread-safe as they may be called from different threads. + * + * @since 2.0.0 */ @Experimental abstract class ContinuousQueryListener { + import ContinuousQueryListener._ + /** * Called when a query is started. * @note This is called synchronously with @@ -36,6 +38,7 @@ abstract class ContinuousQueryListener { * that is, `onQueryStart` will be called on all listeners before * `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please * don't block this method as it will block your query. + * @since 2.0.0 */ def onQueryStarted(queryStarted: QueryStarted): Unit @@ -46,10 +49,14 @@ abstract class ContinuousQueryListener { * latest no matter when this method is called. Therefore, the status of [[ContinuousQuery]] * may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]] * is terminated when you are processing [[QueryProgress]]. + * @since 2.0.0 */ def onQueryProgress(queryProgress: QueryProgress): Unit - /** Called when a query is stopped, with or without error */ + /** + * Called when a query is stopped, with or without error. + * @since 2.0.0 + */ def onQueryTerminated(queryTerminated: QueryTerminated): Unit } @@ -57,19 +64,32 @@ abstract class ContinuousQueryListener { /** * :: Experimental :: * Companion object of [[ContinuousQueryListener]] that defines the listener events. + * @since 2.0.0 */ @Experimental object ContinuousQueryListener { - /** Base type of [[ContinuousQueryListener]] events */ + /** + * Base type of [[ContinuousQueryListener]] events. + * @since 2.0.0 + */ trait Event - /** Event representing the start of a query */ + /** + * Event representing the start of a query. + * @since 2.0.0 + */ class QueryStarted private[sql](val query: ContinuousQuery) extends Event - /** Event representing any progress updates in a query */ + /** + * Event representing any progress updates in a query. + * @since 2.0.0 + */ class QueryProgress private[sql](val query: ContinuousQuery) extends Event - /** Event representing that termination of a query */ + /** + * Event representing that termination of a query. + * @since 2.0.0 + */ class QueryTerminated private[sql](val query: ContinuousQuery) extends Event } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala similarity index 96% rename from sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala rename to sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala index c686400150..1bfdd2da4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala @@ -15,22 +15,21 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.streaming import scala.collection.mutable import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.util.ContinuousQueryListener import org.apache.spark.util.{Clock, SystemClock} /** * :: Experimental :: - * A class to manage all the [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active - * on a [[SparkSession]]. + * A class to manage all the [[ContinuousQuery]] active on a [[SparkSession]]. * * @since 2.0.0 */ @@ -147,7 +146,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { /** * Register a [[ContinuousQueryListener]] to receive up-calls for life cycle events of - * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]]. + * [[ContinuousQuery]]. * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala similarity index 97% rename from sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala rename to sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala index 5a9852809c..79ddf01042 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.streaming import org.apache.spark.annotation.Experimental import org.apache.spark.sql.execution.streaming.{Offset, Sink} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala similarity index 97% rename from sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala rename to sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala index 2479e67e36..8fccd5b7a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.streaming import org.apache.spark.annotation.Experimental import org.apache.spark.sql.execution.streaming.{Offset, Source} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala similarity index 94% rename from sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala rename to sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala index 256e8a47a4..d3fdbac576 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.streaming import java.util.concurrent.TimeUnit @@ -29,9 +29,11 @@ import org.apache.spark.unsafe.types.CalendarInterval /** * :: Experimental :: * Used to indicate how often results should be produced by a [[ContinuousQuery]]. + * + * @since 2.0.0 */ @Experimental -sealed trait Trigger {} +sealed trait Trigger /** * :: Experimental :: @@ -53,6 +55,8 @@ sealed trait Trigger {} * import java.util.concurrent.TimeUnit * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * }}} + * + * @since 2.0.0 */ @Experimental case class ProcessingTime(intervalMs: Long) extends Trigger { @@ -62,6 +66,8 @@ case class ProcessingTime(intervalMs: Long) extends Trigger { /** * :: Experimental :: * Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s. + * + * @since 2.0.0 */ @Experimental object ProcessingTime { @@ -73,6 +79,8 @@ object ProcessingTime { * {{{ * df.write.trigger(ProcessingTime("10 seconds")) * }}} + * + * @since 2.0.0 */ def apply(interval: String): ProcessingTime = { if (StringUtils.isBlank(interval)) { @@ -101,6 +109,8 @@ object ProcessingTime { * import scala.concurrent.duration._ * df.write.trigger(ProcessingTime(10.seconds)) * }}} + * + * @since 2.0.0 */ def apply(interval: Duration): ProcessingTime = { new ProcessingTime(interval.toMillis) @@ -113,6 +123,8 @@ object ProcessingTime { * {{{ * df.write.trigger(ProcessingTime.create("10 seconds")) * }}} + * + * @since 2.0.0 */ def create(interval: String): ProcessingTime = { apply(interval) @@ -126,6 +138,8 @@ object ProcessingTime { * import java.util.concurrent.TimeUnit * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) * }}} + * + * @since 2.0.0 */ def create(interval: Long, unit: TimeUnit): ProcessingTime = { new ProcessingTime(unit.toMillis(interval)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala index 0d18a645f6..52c200796c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.streaming.ProcessingTime class ProcessingTimeSuite extends SparkFunSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index 7f99d303ba..00d5e051de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.{CountDownLatch, TimeUnit} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.ProcessingTime +import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.util.{Clock, ManualClock, SystemClock} class ProcessingTimeExecutorSuite extends SparkFunSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala similarity index 95% rename from sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala index 8788898fc8..cdd97da8ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.util +package org.apache.spark.sql.streaming import java.util.concurrent.ConcurrentLinkedQueue @@ -26,14 +26,13 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ -import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.util.ContinuousQueryListener.{QueryProgress, QueryStarted, QueryTerminated} -class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { + +class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { import testImplicits._ + import ContinuousQueryListener._ after { spark.streams.active.foreach(_.stop()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index b75c3ea106..c1e4970b3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -28,12 +28,11 @@ import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException -import org.apache.spark.sql.{ContinuousQuery, Dataset, OutputMode, StreamTest} +import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils -class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { +class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter { import AwaitTerminationTester._ import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala index f469cde6be..e4ca86d9d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala @@ -18,11 +18,10 @@ package org.apache.spark.sql.streaming import org.apache.spark.SparkException -import org.apache.spark.sql.StreamTest import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, MemoryStream, StreamExecution} -import org.apache.spark.sql.test.SharedSQLContext -class ContinuousQuerySuite extends StreamTest with SharedSQLContext { + +class ContinuousQuerySuite extends StreamTest { import AwaitTerminationTester._ import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 3d8dcaf5a5..1c73208736 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils -class FileStreamSinkSuite extends StreamTest with SharedSQLContext { +class FileStreamSinkSuite extends StreamTest { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 1d784f1f4e..f681b8878d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -137,7 +137,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { val valueSchema = new StructType().add("value", StringType) } -class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { +class FileStreamSourceSuite extends FileStreamSourceTest { import testImplicits._ @@ -594,7 +594,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } } -class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext { +class FileStreamSourceStressTestSuite extends FileStreamSourceTest { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala index 4efb7cf52d..1c0fb34dd0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -23,9 +23,7 @@ import java.util.UUID import scala.util.Random import scala.util.control.NonFatal -import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest} import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils /** @@ -38,7 +36,7 @@ import org.apache.spark.util.Utils * * At the end, the resulting files are loaded and the answer is checked. */ -class FileStressSuite extends StreamTest with SharedSQLContext { +class FileStressSuite extends StreamTest { import testImplicits._ testQuietly("fault tolerance stress test - unpartitioned output") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index e5bd0b4744..df76499fa2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils -class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { +class MemorySinkSuite extends StreamTest with BeforeAndAfter { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala index 81760d2aa8..7f2972edea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala @@ -17,11 +17,9 @@ package org.apache.spark.sql.streaming -import org.apache.spark.sql.StreamTest import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.test.SharedSQLContext -class MemorySourceStressSuite extends StreamTest with SharedSQLContext { +class MemorySourceStressSuite extends StreamTest { import testImplicits._ test("memory stress test") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index c17cb1de6c..9414b1ce40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.ManualClock -class StreamSuite extends StreamTest with SharedSQLContext { +class StreamSuite extends StreamTest { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala similarity index 98% rename from sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala rename to sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index b033725f18..dd8672aa64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.streaming import java.lang.Thread.UncaughtExceptionHandler @@ -33,10 +33,12 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ +import org.apache.spark.sql.{Dataset, Encoder, QueryTest, Row} import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} /** @@ -63,7 +65,7 @@ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} * avoid hanging forever in the case of failures. However, individual suites can change this * by overriding `streamingTimeout`. */ -trait StreamTest extends QueryTest with Timeouts { +trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { /** How long to wait for an active stream to catch up when checking a result. */ val streamingTimeout = 10.seconds @@ -523,7 +525,7 @@ trait StreamTest extends QueryTest with Timeouts { case class ExpectException[E <: Exception]()(implicit val t: ClassTag[E]) extends ExpectedBehavior - private val DEFAULT_TEST_TIMEOUT = 1 second + private val DEFAULT_TEST_TIMEOUT = 1.second def test( expectedBehavior: ExpectedBehavior, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 322bbb9ea0..1f174aee8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -20,19 +20,18 @@ package org.apache.spark.sql.streaming import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, StreamTest} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.InternalOutputModes._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ -import org.apache.spark.sql.test.SharedSQLContext object FailureSinglton { var firstTime = true } -class StreamingAggregationSuite extends StreamTest with SharedSQLContext with BeforeAndAfterAll { +class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { override def afterAll(): Unit = { super.afterAll() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index 38a0534ab6..a2aac69064 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, StreamTest} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -101,7 +101,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { } } -class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { +class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { private def newMetadataDir = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath -- GitLab