diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index 9709bd3d6574d7290cf4845169bf98fc2ea01421..678b0643fd7067e9e4eebec79387c3bf25005b6b 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -24,41 +24,58 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below. **Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.** -2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream of byte array as follows: +2. **Programming:** In the streaming application code, import `KinesisInputDStream` and create the input DStream of byte array as follows: <div class="codetabs"> <div data-lang="scala" markdown="1"> - import org.apache.spark.streaming.Duration - import org.apache.spark.streaming.kinesis._ - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream - - val kinesisStream = KinesisUtils.createStream( - streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], - [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2) - - See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$) + import org.apache.spark.storage.StorageLevel + import org.apache.spark.streaming.kinesis.KinesisInputDStream + import org.apache.spark.streaming.{Seconds, StreamingContext} + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + val kinesisStream = KinesisInputDStream.builder + .streamingContext(streamingContext) + .endpointUrl([endpoint URL]) + .regionName([region name]) + .streamName([streamName]) + .initialPositionInStream([initial position]) + .checkpointAppName([Kinesis app name]) + .checkpointInterval([checkpoint interval]) + .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .build() + + See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisInputDStream) and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the [Running the Example](#running-the-example) subsection for instructions on how to run the example. </div> <div data-lang="java" markdown="1"> - import org.apache.spark.streaming.Duration; - import org.apache.spark.streaming.kinesis.*; - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; - - JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream( - streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], - [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2); + import org.apache.spark.storage.StorageLevel + import org.apache.spark.streaming.kinesis.KinesisInputDStream + import org.apache.spark.streaming.Seconds + import org.apache.spark.streaming.StreamingContext + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder + .streamingContext(streamingContext) + .endpointUrl([endpoint URL]) + .regionName([region name]) + .streamName([streamName]) + .initialPositionInStream([initial position]) + .checkpointAppName([Kinesis app name]) + .checkpointInterval([checkpoint interval]) + .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .build(); See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html) and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example. </div> <div data-lang="python" markdown="1"> - from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream + from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream - kinesisStream = KinesisUtils.createStream( - streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], - [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2) + kinesisStream = KinesisUtils.createStream( + streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], + [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2) See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils) and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example. @@ -70,27 +87,40 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m <div class="codetabs"> <div data-lang="scala" markdown="1"> - - import org.apache.spark.streaming.Duration - import org.apache.spark.streaming.kinesis._ - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream - - val kinesisStream = KinesisUtils.createStream[T]( - streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], - [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2, - [message handler]) + import org.apache.spark.storage.StorageLevel + import org.apache.spark.streaming.kinesis.KinesisInputDStream + import org.apache.spark.streaming.{Seconds, StreamingContext} + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + val kinesisStream = KinesisInputDStream.builder + .streamingContext(streamingContext) + .endpointUrl([endpoint URL]) + .regionName([region name]) + .streamName([streamName]) + .initialPositionInStream([initial position]) + .checkpointAppName([Kinesis app name]) + .checkpointInterval([checkpoint interval]) + .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .buildWithMessageHandler([message handler]) </div> <div data-lang="java" markdown="1"> - - import org.apache.spark.streaming.Duration; - import org.apache.spark.streaming.kinesis.*; - import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; - - JavaReceiverInputDStream<T> kinesisStream = KinesisUtils.createStream( - streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], - [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2, - [message handler], [class T]); + import org.apache.spark.storage.StorageLevel + import org.apache.spark.streaming.kinesis.KinesisInputDStream + import org.apache.spark.streaming.Seconds + import org.apache.spark.streaming.StreamingContext + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + + KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder + .streamingContext(streamingContext) + .endpointUrl([endpoint URL]) + .regionName([region name]) + .streamName([streamName]) + .initialPositionInStream([initial position]) + .checkpointAppName([Kinesis app name]) + .checkpointInterval([checkpoint interval]) + .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .buildWithMessageHandler([message handler]); </div> </div> diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index f14117b708a0d1b3c87eccd63a14fda6a6993304..cde2c4b04c0c768a4185876c1267eaf25416089e 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -23,7 +23,6 @@ import java.nio.ByteBuffer import scala.util.Random import com.amazonaws.auth.DefaultAWSCredentialsProviderChain -import com.amazonaws.regions.RegionUtils import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.model.PutRecordRequest @@ -34,7 +33,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions -import org.apache.spark.streaming.kinesis.KinesisUtils +import org.apache.spark.streaming.kinesis.KinesisInputDStream /** @@ -135,8 +134,16 @@ object KinesisWordCountASL extends Logging { // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => - KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, - InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) + KinesisInputDStream.builder + .streamingContext(ssc) + .streamName(streamName) + .endpointUrl(endpointUrl) + .regionName(regionName) + .initialPositionInStream(InitialPositionInStream.LATEST) + .checkpointAppName(appName) + .checkpointInterval(kinesisCheckpointInterval) + .storageLevel(StorageLevel.MEMORY_AND_DISK_2) + .build() } // Union all the streams