Skip to content
Snippets Groups Projects
Commit 3a600386 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-7692] Updated Kinesis examples

- Updated Kinesis examples to use stable API
- Cleaned up comments, etc.
- Renamed KinesisWordCountProducerASL to KinesisWordProducerASL

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6249 from tdas/kinesis-examples and squashes the following commits:

7cc307b [Tathagata Das] More tweaks
f080872 [Tathagata Das] More cleanup
841987f [Tathagata Das] Small update
011cbe2 [Tathagata Das] More fixes
b0d74f9 [Tathagata Das] Updated examples.
parent 0a7a94ea
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,7 @@ import java.util.ArrayList; ...@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.amazonaws.regions.RegionUtils;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
...@@ -40,140 +41,146 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn ...@@ -40,140 +41,146 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
/** /**
* Java-friendly Kinesis Spark Streaming WordCount example * Consumes messages from a Amazon Kinesis streams and does wordcount.
* *
* See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details * This example spins up 1 Kinesis Receiver per shard for the given stream.
* on the Kinesis Spark Streaming integration. * It then starts pulling from the last checkpointed sequence number of the given stream.
* *
* This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name]
* for the given stream. * [app-name] is the name of the consumer app, used to track the read data in DynamoDB
* It then starts pulling from the last checkpointed sequence number of the given * [stream-name] name of the Kinesis stream (ie. mySparkStream)
* <stream-name> and <endpoint-url>. * [endpoint-url] endpoint of the Kinesis service
* (e.g. https://kinesis.us-east-1.amazonaws.com)
* *
* Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
*
* This code uses the DefaultAWSCredentialsProviderChain and searches for credentials
* in the following order of precedence:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
* Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
* Instance profile credentials - delivered through the Amazon EC2 metadata service
*
* Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>
* <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
* <endpoint-url> is the endpoint of the Kinesis service
* (ie. https://kinesis.us-east-1.amazonaws.com)
* *
* Example: * Example:
* $ export AWS_ACCESS_KEY_ID=<your-access-key> * # export AWS keys if necessary
* $ export AWS_ACCESS_KEY_ID=[your-access-key]
* $ export AWS_SECRET_KEY=<your-secret-key> * $ export AWS_SECRET_KEY=<your-secret-key>
* $ $SPARK_HOME/bin/run-example \
* org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \
* https://kinesis.us-east-1.amazonaws.com
* *
* Note that number of workers/threads should be 1 more than the number of receivers. * # run the example
* This leaves one thread available for actually processing the data. * $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \
* https://kinesis.us-east-1.amazonaws.com
*
* There is a companion helper class called KinesisWordProducerASL which puts dummy data
* onto the Kinesis stream.
* *
* There is a companion helper class called KinesisWordCountProducerASL which puts dummy data * This code uses the DefaultAWSCredentialsProviderChain to find credentials
* onto the Kinesis stream. * in the following order:
* Usage instructions for KinesisWordCountProducerASL are provided in the class definition. * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
* Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
* Instance profile credentials - delivered through the Amazon EC2 metadata service
* For more information, see
* http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
*
* See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
* the Kinesis Spark Streaming integration.
*/ */
public final class JavaKinesisWordCountASL { // needs to be public for access from run-example public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
/* Make the constructor private to enforce singleton */ public static void main(String[] args) {
private JavaKinesisWordCountASL() { // Check that all required args were passed in.
if (args.length != 3) {
System.err.println(
"Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" +
" <app-name> is the name of the app, used to track the read data in DynamoDB\n" +
" <stream-name> is the name of the Kinesis stream\n" +
" <endpoint-url> is the endpoint of the Kinesis service\n" +
" (e.g. https://kinesis.us-east-1.amazonaws.com)\n" +
"Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" +
"See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" +
"details.\n"
);
System.exit(1);
} }
public static void main(String[] args) { // Set default log4j logging level to WARN to hide Spark logs
/* Check that all required args were passed in. */ StreamingExamples.setStreamingLogLevels();
if (args.length < 2) {
System.err.println( // Populate the appropriate variables from the given args
"Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n" + String kinesisAppName = args[0];
" <stream-name> is the name of the Kinesis stream\n" + String streamName = args[1];
" <endpoint-url> is the endpoint of the Kinesis service\n" + String endpointUrl = args[2];
" (e.g. https://kinesis.us-east-1.amazonaws.com)\n");
System.exit(1); // Create a Kinesis client in order to determine the number of shards for the given stream
} AmazonKinesisClient kinesisClient =
new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
StreamingExamples.setStreamingLogLevels(); kinesisClient.setEndpoint(endpointUrl);
int numShards =
/* Populate the appropriate variables from the given args */ kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
String streamName = args[0];
String endpointUrl = args[1];
/* Set the batch interval to a fixed 2000 millis (2 seconds) */ // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
Duration batchInterval = new Duration(2000); // This is not a necessity; if there are less receivers/DStreams than the number of shards,
// then the shards will be automatically distributed among the receivers and each receiver
/* Create a Kinesis client in order to determine the number of shards for the given stream */ // will receive data from multiple shards.
AmazonKinesisClient kinesisClient = new AmazonKinesisClient( int numStreams = numShards;
new DefaultAWSCredentialsProviderChain());
kinesisClient.setEndpoint(endpointUrl); // Spark Streaming batch interval
Duration batchInterval = new Duration(2000);
/* Determine the number of shards from the stream */
int numShards = kinesisClient.describeStream(streamName) // Kinesis checkpoint interval. Same as batchInterval for this example.
.getStreamDescription().getShards().size(); Duration kinesisCheckpointInterval = batchInterval;
/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */ // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
int numStreams = numShards; // DynamoDB of the same region as the Kinesis stream
String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName();
/* Setup the Spark config. */
SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount"); // Setup the Spark config and StreamingContext
SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL");
/* Kinesis checkpoint interval. Same as batchInterval for this example. */ JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
Duration checkpointInterval = batchInterval;
// Create the Kinesis DStreams
List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams);
for (int i = 0; i < numStreams; i++) {
streamsList.add(
KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2())
);
}
/* Setup the StreamingContext */ // Union all the streams if there is more than 1 stream
JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval); JavaDStream<byte[]> unionStreams;
if (streamsList.size() > 1) {
unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
} else {
// Otherwise, just use the 1 stream
unionStreams = streamsList.get(0);
}
/* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */ // Convert each line of Array[Byte] to String, and split into words
List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams); JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
for (int i = 0; i < numStreams; i++) { @Override
streamsList.add( public Iterable<String> call(byte[] line) {
KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval, return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()) }
); });
// Map each word to a (word, 1) tuple so we can reduce by key to count the words
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
} }
).reduceByKey(
/* Union all the streams if there is more than 1 stream */ new Function2<Integer, Integer, Integer>() {
JavaDStream<byte[]> unionStreams; @Override
if (streamsList.size() > 1) { public Integer call(Integer i1, Integer i2) {
unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size())); return i1 + i2;
} else { }
/* Otherwise, just use the 1 stream */
unionStreams = streamsList.get(0);
} }
);
/* // Print the first 10 wordCounts
* Split each line of the union'd DStreams into multiple words using flatMap to produce the collection. wordCounts.print();
* Convert lines of byte[] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR.
*/ // Start the streaming context and await termination
JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() { jssc.start();
@Override jssc.awaitTermination();
public Iterable<String> call(byte[] line) { }
return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
}
});
/* Map each word to a (word, 1) tuple, then reduce/aggregate by word. */
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
/* Print the first 10 wordCounts */
wordCounts.print();
/* Start the streaming context and await termination */
jssc.start();
jssc.awaitTermination();
}
} }
...@@ -18,213 +18,238 @@ ...@@ -18,213 +18,238 @@
package org.apache.spark.examples.streaming package org.apache.spark.examples.streaming
import java.nio.ByteBuffer import java.nio.ByteBuffer
import scala.util.Random import scala.util.Random
import org.apache.spark.Logging
import org.apache.spark.SparkConf import com.amazonaws.auth.{DefaultAWSCredentialsProviderChain, BasicAWSCredentials}
import org.apache.spark.storage.StorageLevel import com.amazonaws.regions.RegionUtils
import org.apache.spark.streaming.Milliseconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisUtils
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.PutRecordRequest import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.log4j.Logger import org.apache.log4j.{Level, Logger}
import org.apache.log4j.Level
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisUtils
/** /**
* Kinesis Spark Streaming WordCount example. * Consumes messages from a Amazon Kinesis streams and does wordcount.
* *
* See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details on * This example spins up 1 Kinesis Receiver per shard for the given stream.
* the Kinesis Spark Streaming integration. * It then starts pulling from the last checkpointed sequence number of the given stream.
* *
* This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard * Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
* for the given stream. * <app-name> is the name of the consumer app, used to track the read data in DynamoDB
* It then starts pulling from the last checkpointed sequence number of the given * <stream-name> name of the Kinesis stream (ie. mySparkStream)
* <stream-name> and <endpoint-url>. * <endpoint-url> endpoint of the Kinesis service
* (e.g. https://kinesis.us-east-1.amazonaws.com)
* *
* Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
*
* This code uses the DefaultAWSCredentialsProviderChain and searches for credentials
* in the following order of precedence:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
* Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
* Instance profile credentials - delivered through the Amazon EC2 metadata service
*
* Usage: KinesisWordCountASL <stream-name> <endpoint-url>
* <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
* <endpoint-url> is the endpoint of the Kinesis service
* (ie. https://kinesis.us-east-1.amazonaws.com)
* *
* Example: * Example:
* $ export AWS_ACCESS_KEY_ID=<your-access-key> * # export AWS keys if necessary
* $ export AWS_SECRET_KEY=<your-secret-key> * $ export AWS_ACCESS_KEY_ID=<your-access-key>
* $ $SPARK_HOME/bin/run-example \ * $ export AWS_SECRET_KEY=<your-secret-key>
* org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \ *
* https://kinesis.us-east-1.amazonaws.com * # run the example
* $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \
* https://kinesis.us-east-1.amazonaws.com
* *
* * There is a companion helper class called KinesisWordProducerASL which puts dummy data
* Note that number of workers/threads should be 1 more than the number of receivers. * onto the Kinesis stream.
* This leaves one thread available for actually processing the data.
* *
* There is a companion helper class below called KinesisWordCountProducerASL which puts * This code uses the DefaultAWSCredentialsProviderChain to find credentials
* dummy data onto the Kinesis stream. * in the following order:
* Usage instructions for KinesisWordCountProducerASL are provided in that class definition. * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
* Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
* Instance profile credentials - delivered through the Amazon EC2 metadata service
* For more information, see
* http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
*
* See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
* the Kinesis Spark Streaming integration.
*/ */
private object KinesisWordCountASL extends Logging { object KinesisWordCountASL extends Logging {
def main(args: Array[String]) { def main(args: Array[String]) {
/* Check that all required args were passed in. */ // Check that all required args were passed in.
if (args.length < 2) { if (args.length != 3) {
System.err.println( System.err.println(
""" """
|Usage: KinesisWordCount <stream-name> <endpoint-url> |Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
|
| <app-name> is the name of the consumer app, used to track the read data in DynamoDB
| <stream-name> is the name of the Kinesis stream | <stream-name> is the name of the Kinesis stream
| <endpoint-url> is the endpoint of the Kinesis service | <endpoint-url> is the endpoint of the Kinesis service
| (e.g. https://kinesis.us-east-1.amazonaws.com) | (e.g. https://kinesis.us-east-1.amazonaws.com)
|
|Generate input data for Kinesis stream using the example KinesisWordProducerASL.
|See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more
|details.
""".stripMargin) """.stripMargin)
System.exit(1) System.exit(1)
} }
StreamingExamples.setStreamingLogLevels() StreamingExamples.setStreamingLogLevels()
/* Populate the appropriate variables from the given args */ // Populate the appropriate variables from the given args
val Array(streamName, endpointUrl) = args val Array(appName, streamName, endpointUrl) = args
/* Determine the number of shards from the stream */
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) // Determine the number of shards from the stream using the low-level Kinesis Client
// from the AWS Java SDK.
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
require(credentials != null,
"No AWS credentials found. Please specify credentials using one of the methods specified " +
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl) kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
.size()
/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */ // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
// This is not a necessity; if there are less receivers/DStreams than the number of shards,
// then the shards will be automatically distributed among the receivers and each receiver
// will receive data from multiple shards.
val numStreams = numShards val numStreams = numShards
/* Setup the and SparkConfig and StreamingContext */ // Spark Streaming batch interval
/* Spark Streaming batch interval */
val batchInterval = Milliseconds(2000) val batchInterval = Milliseconds(2000)
val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
val ssc = new StreamingContext(sparkConfig, batchInterval)
/* Kinesis checkpoint interval. Same as batchInterval for this example. */ // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
//on sequence number of records that have been received. Same as batchInterval for this example.
val kinesisCheckpointInterval = batchInterval val kinesisCheckpointInterval = batchInterval
/* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */ // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
// DynamoDB of the same region as the Kinesis stream
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
// Setup the SparkConfig and StreamingContext
val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL")
val ssc = new StreamingContext(sparkConfig, batchInterval)
// Create the Kinesis DStreams
val kinesisStreams = (0 until numStreams).map { i => val kinesisStreams = (0 until numStreams).map { i =>
KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
} }
/* Union all the streams */ // Union all the streams
val unionStreams = ssc.union(kinesisStreams) val unionStreams = ssc.union(kinesisStreams)
/* Convert each line of Array[Byte] to String, split into words, and count them */ // Convert each line of Array[Byte] to String, and split into words
val words = unionStreams.flatMap(byteArray => new String(byteArray) val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
.split(" "))
/* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */ // Map each word to a (word, 1) tuple so we can reduce by key to count the words
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
/* Print the first 10 wordCounts */ // Print the first 10 wordCounts
wordCounts.print() wordCounts.print()
/* Start the streaming context and await termination */ // Start the streaming context and await termination
ssc.start() ssc.start()
ssc.awaitTermination() ssc.awaitTermination()
} }
} }
/** /**
* Usage: KinesisWordCountProducerASL <stream-name> <kinesis-endpoint-url> * Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \
* <recordsPerSec> <wordsPerRecord> * <records-per-sec> <words-per-record>
*
* <stream-name> is the name of the Kinesis stream (ie. mySparkStream) * <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
* <kinesis-endpoint-url> is the endpoint of the Kinesis service * <endpoint-url> is the endpoint of the Kinesis service
* (ie. https://kinesis.us-east-1.amazonaws.com) * (ie. https://kinesis.us-east-1.amazonaws.com)
* <records-per-sec> is the rate of records per second to put onto the stream * <records-per-sec> is the rate of records per second to put onto the stream
* <words-per-record> is the rate of records per second to put onto the stream * <words-per-record> is the rate of records per second to put onto the stream
* *
* Example: * Example:
* $ export AWS_ACCESS_KEY_ID=<your-access-key> * $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL mySparkStream \
* $ export AWS_SECRET_KEY=<your-secret-key> * https://kinesis.us-east-1.amazonaws.com us-east-1 10 5
* $ $SPARK_HOME/bin/run-example \
* org.apache.spark.examples.streaming.KinesisWordCountProducerASL mySparkStream \
* https://kinesis.us-east-1.amazonaws.com 10 5
*/ */
private object KinesisWordCountProducerASL { object KinesisWordProducerASL {
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length < 4) { if (args.length != 4) {
System.err.println("Usage: KinesisWordCountProducerASL <stream-name> <endpoint-url>" + System.err.println(
" <records-per-sec> <words-per-record>") """
|Usage: KinesisWordProducerASL <stream-name> <endpoint-url> <records-per-sec> <words-per-record>
|
| <stream-name> is the name of the Kinesis stream
| <endpoint-url> is the endpoint of the Kinesis service
| (e.g. https://kinesis.us-east-1.amazonaws.com)
| <records-per-sec> is the rate of records per second to put onto the stream
| <words-per-record> is the rate of records per second to put onto the stream
|
""".stripMargin)
System.exit(1) System.exit(1)
} }
// Set default log4j logging level to WARN to hide Spark logs
StreamingExamples.setStreamingLogLevels() StreamingExamples.setStreamingLogLevels()
/* Populate the appropriate variables from the given args */ // Populate the appropriate variables from the given args
val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
/* Generate the records and return the totals */ // Generate the records and return the totals
val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt) val totals = generate(stream, endpoint, recordsPerSecond.toInt,
wordsPerRecord.toInt)
/* Print the array of (index, total) tuples */ // Print the array of (word, total) tuples
println("Totals") println("Totals for the words sent")
totals.foreach(total => println(total.toString())) totals.foreach(println(_))
} }
def generate(stream: String, def generate(stream: String,
endpoint: String, endpoint: String,
recordsPerSecond: Int, recordsPerSecond: Int,
wordsPerRecord: Int): Seq[(Int, Int)] = { wordsPerRecord: Int): Seq[(String, Int)] = {
val MaxRandomInts = 10
/* Create the Kinesis client */ val randomWords = List("spark","you","are","my","father")
val totals = scala.collection.mutable.Map[String, Int]()
// Create the low-level Kinesis Client from the AWS Java SDK.
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpoint) kinesisClient.setEndpoint(endpoint)
println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" + println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
s" $recordsPerSecond records per second and $wordsPerRecord words per record"); s" $recordsPerSecond records per second and $wordsPerRecord words per record")
val totals = new Array[Int](MaxRandomInts) // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
/* Put String records onto the stream per the given recordPerSec and wordsPerRecord */ for (i <- 1 to 10) {
for (i <- 1 to 5) { // Generate recordsPerSec records to put onto the stream
val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
/* Generate recordsPerSec records to put onto the stream */ // Randomly generate wordsPerRecord number of words
val records = (1 to recordsPerSecond.toInt).map { recordNum =>
/*
* Randomly generate each wordsPerRec words between 0 (inclusive)
* and MAX_RANDOM_INTS (exclusive)
*/
val data = (1 to wordsPerRecord.toInt).map(x => { val data = (1 to wordsPerRecord.toInt).map(x => {
/* Generate the random int */ // Get a random index to a word
val randomInt = Random.nextInt(MaxRandomInts) val randomWordIdx = Random.nextInt(randomWords.size)
val randomWord = randomWords(randomWordIdx)
/* Keep track of the totals */ // Increment total count to compare to server counts later
totals(randomInt) += 1 totals(randomWord) = totals.getOrElse(randomWord, 0) + 1
randomInt.toString() randomWord
}).mkString(" ") }).mkString(" ")
/* Create a partitionKey based on recordNum */ // Create a partitionKey based on recordNum
val partitionKey = s"partitionKey-$recordNum" val partitionKey = s"partitionKey-$recordNum"
/* Create a PutRecordRequest with an Array[Byte] version of the data */ // Create a PutRecordRequest with an Array[Byte] version of the data
val putRecordRequest = new PutRecordRequest().withStreamName(stream) val putRecordRequest = new PutRecordRequest().withStreamName(stream)
.withPartitionKey(partitionKey) .withPartitionKey(partitionKey)
.withData(ByteBuffer.wrap(data.getBytes())); .withData(ByteBuffer.wrap(data.getBytes()))
/* Put the record onto the stream and capture the PutRecordResult */ // Put the record onto the stream and capture the PutRecordResult
val putRecordResult = kinesisClient.putRecord(putRecordRequest); val putRecordResult = kinesisClient.putRecord(putRecordRequest)
} }
/* Sleep for a second */ // Sleep for a second
Thread.sleep(1000) Thread.sleep(1000)
println("Sent " + recordsPerSecond + " records") println("Sent " + recordsPerSecond + " records")
} }
// Convert the totals to (index, total) tuple
/* Convert the totals to (index, total) tuple */ totals.toSeq.sortBy(_._1)
(0 to (MaxRandomInts - 1)).zip(totals)
} }
} }
...@@ -233,8 +258,7 @@ private object KinesisWordCountProducerASL { ...@@ -233,8 +258,7 @@ private object KinesisWordCountProducerASL {
* This has been lifted from the examples/ project to remove the circular dependency. * This has been lifted from the examples/ project to remove the circular dependency.
*/ */
private[streaming] object StreamingExamples extends Logging { private[streaming] object StreamingExamples extends Logging {
// Set reasonable logging levels for streaming if the user has not configured log4j.
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() { def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) { if (!log4jInitialized) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment