-
Seigneurin, Alexis (CONT) authored
idempotant -> idempotent Author: Seigneurin, Alexis (CONT) <Alexis.Seigneurin@capitalone.com> Closes #14833 from aseigneurin/fix-typo.
Seigneurin, Alexis (CONT) authoredidempotant -> idempotent Author: Seigneurin, Alexis (CONT) <Alexis.Seigneurin@capitalone.com> Closes #14833 from aseigneurin/fix-typo.
layout: global
displayTitle: Structured Streaming Programming Guide [Alpha]
title: Structured Streaming Programming Guide
- This will become a table of contents (this text will be scraped). {:toc}
Overview
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Spark 2.0 is the ALPHA RELEASE of Structured Streaming and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.
Quick Example
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in Scala/ Java/ Python. And if you download Spark, you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.
{% highlight scala %} import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession
val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate()
import spark.implicits._ {% endhighlight %}
{% highlight java %} import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays; import java.util.Iterator;
SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate(); {% endhighlight %}
{% highlight python %} from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split
spark = SparkSession
.builder()
.appName("StructuredNetworkWordCount")
.getOrCreate()
{% endhighlight %}
Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.
{% highlight scala %} // Create DataFrame representing the stream of input lines from connection to localhost:9999 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
// Split the lines into words val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count val wordCounts = words.groupBy("value").count() {% endhighlight %}
This lines
DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as[String]
, so that we can apply the flatMap
operation to split each line into multiple words. The resultant words
Dataset contains all the words. Finally, we have defined the wordCounts
DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
{% highlight java %} // Create DataFrame representing the stream of input lines from connection to localhost:9999 Dataset lines = spark .readStream() .format("socket") .option("host", "localhost") .option("port", 9999) .load();
// Split the lines into words Dataset words = lines .as(Encoders.STRING()) .flatMap( new FlatMapFunction<String, String>() { @Override public Iterator call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }, Encoders.STRING());
// Generate running word count Dataset wordCounts = words.groupBy("value").count(); {% endhighlight %}
This lines
DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
, so that we can apply the flatMap
operation to split each line into multiple words. The resultant words
Dataset contains all the words. Finally, we have defined the wordCounts
DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
{% highlight python %}
Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark
.readStream
.format('socket')
.option('host', 'localhost')
.option('port', 9999)
.load()
Split the lines into words
words = lines.select( explode( split(lines.value, ' ') ).alias('word') )
Generate running word count
wordCounts = words.groupBy('word').count() {% endhighlight %}
This lines
DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function alias
to name the new column as "word". Finally, we have defined the wordCounts
DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode("complete")
) to the console every time they are updated. And then start the streaming computation using start()
.
{% highlight scala %} // Start running the query that prints the running counts to the console val query = wordCounts.writeStream .outputMode("complete") .format("console") .start()
query.awaitTermination() {% endhighlight %}
{% highlight java %} // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start();
query.awaitTermination(); {% endhighlight %}
{% highlight python %}
Start running the query that prints the running counts to the console
query = wordCounts
.writeStream
.outputMode('complete')
.format('console')
.start()
query.awaitTermination() {% endhighlight %}
After this code is executed, the streaming computation will have started in the background. The query
object is a handle to that active streaming query, and we have decided to wait for the termination of the query using query.awaitTermination()
to prevent the process from exiting while the query is active.
To actually execute this example code, you can either compile the code in your own Spark application, or simply run the example once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using
$ nc -lk 9999
Then, in a different terminal, you can start the example by using
Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.
{% highlight bash %}
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999 apache spark apache hadoop ... {% endhighlight %} |
{% highlight bash %}
# TERMINAL 2: RUNNING StructuredNetworkWordCount
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999 Batch: 0+------+-----+ | value|count| +------+-----+ |apache| 1| | spark| 1| +------+-----+ Batch: 1+------+-----+ | value|count| +------+-----+ |apache| 2| | spark| 1| |hadoop| 1| +------+-----+ ... {% endhighlight %}
{% highlight bash %}
# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999 Batch: 0+------+-----+ | value|count| +------+-----+ |apache| 1| | spark| 1| +------+-----+ Batch: 1+------+-----+ | value|count| +------+-----+ |apache| 2| | spark| 1| |hadoop| 1| +------+-----+ ... {% endhighlight %}
{% highlight bash %}
# TERMINAL 2: RUNNING structured_network_wordcount.py
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999 Batch: 0+------+-----+ | value|count| +------+-----+ |apache| 1| | spark| 1| +------+-----+ Batch: 1+------+-----+ | value|count| +------+-----+ |apache| 2| | spark| 1| |hadoop| 1| +------+-----+ ... {% endhighlight %} |