Skip to content
Snippets Groups Projects
structured-streaming-programming-guide.md 48.76 KiB
layout: global
displayTitle: Structured Streaming Programming Guide [Alpha]
title: Structured Streaming Programming Guide
  • This will become a table of contents (this text will be scraped). {:toc}

Overview

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

Spark 2.0 is the ALPHA RELEASE of Structured Streaming and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.

Quick Example

Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in Scala/ Java/ Python. And if you download Spark, you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.

{% highlight scala %} import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession

val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate()

import spark.implicits._ {% endhighlight %}

{% highlight java %} import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays; import java.util.Iterator;

SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate(); {% endhighlight %}

{% highlight python %} from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split

spark = SparkSession
.builder()
.appName("StructuredNetworkWordCount")
.getOrCreate() {% endhighlight %}

Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.

{% highlight scala %} // Create DataFrame representing the stream of input lines from connection to localhost:9999 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()

// Split the lines into words val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count val wordCounts = words.groupBy("value").count() {% endhighlight %}

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as[String], so that we can apply the flatMap operation to split each line into multiple words. The resultant words Dataset contains all the words. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.

{% highlight java %} // Create DataFrame representing the stream of input lines from connection to localhost:9999 Dataset lines = spark .readStream() .format("socket") .option("host", "localhost") .option("port", 9999) .load();

// Split the lines into words Dataset words = lines .as(Encoders.STRING()) .flatMap( new FlatMapFunction<String, String>() { @Override public Iterator call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }, Encoders.STRING());

// Generate running word count Dataset wordCounts = words.groupBy("value").count(); {% endhighlight %}

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as(Encoders.STRING()), so that we can apply the flatMap operation to split each line into multiple words. The resultant words Dataset contains all the words. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.

{% highlight python %}

Create DataFrame representing the stream of input lines from connection to localhost:9999

lines = spark
.readStream
.format('socket')
.option('host', 'localhost')
.option('port', 9999)
.load()

Split the lines into words

words = lines.select( explode( split(lines.value, ' ') ).alias('word') )

Generate running word count

wordCounts = words.groupBy('word').count() {% endhighlight %}

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function alias to name the new column as "word". Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.

We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode("complete")) to the console every time they are updated. And then start the streaming computation using start().

{% highlight scala %} // Start running the query that prints the running counts to the console val query = wordCounts.writeStream .outputMode("complete") .format("console") .start()

query.awaitTermination() {% endhighlight %}

{% highlight java %} // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start();

query.awaitTermination(); {% endhighlight %}

{% highlight python %}

Start running the query that prints the running counts to the console

query = wordCounts
.writeStream
.outputMode('complete')
.format('console')
.start()

query.awaitTermination() {% endhighlight %}

After this code is executed, the streaming computation will have started in the background. The query object is a handle to that active streaming query, and we have decided to wait for the termination of the query using query.awaitTermination() to prevent the process from exiting while the query is active.

To actually execute this example code, you can either compile the code in your own Spark application, or simply run the example once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using

$ nc -lk 9999

Then, in a different terminal, you can start the example by using

{% highlight bash %} $ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999 {% endhighlight %}
{% highlight bash %} $ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999 {% endhighlight %}
{% highlight bash %} $ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999 {% endhighlight %}

Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.

{% highlight bash %} # TERMINAL 1: # Running Netcat

$ nc -lk 9999 apache spark apache hadoop

... {% endhighlight %}

{% highlight bash %} # TERMINAL 2: RUNNING StructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999


Batch: 0

+------+-----+ | value|count| +------+-----+ |apache| 1| | spark| 1| +------+-----+


Batch: 1

+------+-----+ | value|count| +------+-----+ |apache| 2| | spark| 1| |hadoop| 1| +------+-----+ ... {% endhighlight %}

{% highlight bash %} # TERMINAL 2: RUNNING JavaStructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999


Batch: 0

+------+-----+ | value|count| +------+-----+ |apache| 1| | spark| 1| +------+-----+


Batch: 1

+------+-----+ | value|count| +------+-----+ |apache| 2| | spark| 1| |hadoop| 1| +------+-----+ ... {% endhighlight %}

{% highlight bash %} # TERMINAL 2: RUNNING structured_network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999


Batch: 0

+------+-----+ | value|count| +------+-----+ |apache| 1| | spark| 1| +------+-----+


Batch: 1

+------+-----+ | value|count| +------+-----+ |apache| 2| | spark| 1| |hadoop| 1| +------+-----+ ... {% endhighlight %}

Programming Model