Skip to content
Snippets Groups Projects
Commit 4f77c062 authored by Yash Sharma's avatar Yash Sharma Committed by Sean Owen
Browse files

[SPARK-20855][Docs][DStream] Update the Spark kinesis docs to use the...

[SPARK-20855][Docs][DStream] Update the Spark kinesis docs to use the KinesisInputDStream builder instead of deprecated KinesisUtils

## What changes were proposed in this pull request?

The examples and docs for Spark-Kinesis integrations use the deprecated KinesisUtils. We should update the docs to use the KinesisInputDStream builder to create DStreams.

## How was this patch tested?

The patch primarily updates the documents. The patch will also need to make changes to the Spark-Kinesis examples. The examples need to be tested.

Author: Yash Sharma <ysharma@atlassian.com>

Closes #18071 from yssharma/ysharma/kinesis_docs.
parent 7f295059
No related branches found
No related tags found
No related merge requests found
...@@ -24,41 +24,58 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m ...@@ -24,41 +24,58 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below. For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below.
**Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.** **Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream of byte array as follows: 2. **Programming:** In the streaming application code, import `KinesisInputDStream` and create the input DStream of byte array as follows:
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
import org.apache.spark.streaming.Duration import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis._ import org.apache.spark.streaming.kinesis.KinesisInputDStream
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], val kinesisStream = KinesisInputDStream.builder
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2) .streamingContext(streamingContext)
.endpointUrl([endpoint URL])
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$) .regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisInputDStream)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the [Running the Example](#running-the-example) subsection for instructions on how to run the example. and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the [Running the Example](#running-the-example) subsection for instructions on how to run the example.
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
import org.apache.spark.streaming.Duration; import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.*; import org.apache.spark.streaming.kinesis.KinesisInputDStream
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream( import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2); KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build();
See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html) See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example. and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
kinesisStream = KinesisUtils.createStream( kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2) [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils) See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example. and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
...@@ -70,27 +87,40 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m ...@@ -70,27 +87,40 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis._ import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val kinesisStream = KinesisUtils.createStream[T]( val kinesisStream = KinesisInputDStream.builder
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], .streamingContext(streamingContext)
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2, .endpointUrl([endpoint URL])
[message handler]) .regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.buildWithMessageHandler([message handler])
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.kinesis.*; import org.apache.spark.streaming.Seconds
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import org.apache.spark.streaming.StreamingContext
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
JavaReceiverInputDStream<T> kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2, .streamingContext(streamingContext)
[message handler], [class T]); .endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.buildWithMessageHandler([message handler]);
</div> </div>
</div> </div>
......
...@@ -23,7 +23,6 @@ import java.nio.ByteBuffer ...@@ -23,7 +23,6 @@ import java.nio.ByteBuffer
import scala.util.Random import scala.util.Random
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.PutRecordRequest import com.amazonaws.services.kinesis.model.PutRecordRequest
...@@ -34,7 +33,7 @@ import org.apache.spark.internal.Logging ...@@ -34,7 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisUtils import org.apache.spark.streaming.kinesis.KinesisInputDStream
/** /**
...@@ -135,8 +134,16 @@ object KinesisWordCountASL extends Logging { ...@@ -135,8 +134,16 @@ object KinesisWordCountASL extends Logging {
// Create the Kinesis DStreams // Create the Kinesis DStreams
val kinesisStreams = (0 until numStreams).map { i => val kinesisStreams = (0 until numStreams).map { i =>
KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, KinesisInputDStream.builder
InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) .streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.build()
} }
// Union all the streams // Union all the streams
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment