From def8126d7788a8bd991ac6f9f9403de701a39dc5 Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Thu, 14 Feb 2013 17:49:43 -0800 Subject: [PATCH] Added TwitterInputDStream from example to StreamingContext. Renamed example TwitterBasic to TwitterPopularTags. --- ...erBasic.scala => TwitterPopularTags.scala} | 33 +++++------- project/SparkBuild.scala | 8 ++- .../spark/streaming/StreamingContext.scala | 52 +++++++++++++------ .../dstream}/TwitterInputDStream.scala | 5 +- 4 files changed, 53 insertions(+), 45 deletions(-) rename examples/src/main/scala/spark/streaming/examples/{twitter/TwitterBasic.scala => TwitterPopularTags.scala} (55%) rename {examples/src/main/scala/spark/streaming/examples/twitter => streaming/src/main/scala/spark/streaming/dstream}/TwitterInputDStream.scala (94%) diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala similarity index 55% rename from examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala rename to examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala index 377bc0c98e..fdb3a4c73c 100644 --- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala @@ -1,19 +1,19 @@ -package spark.streaming.examples.twitter +package spark.streaming.examples -import spark.streaming.StreamingContext._ import spark.streaming.{Seconds, StreamingContext} +import StreamingContext._ import spark.SparkContext._ -import spark.storage.StorageLevel /** * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter * stream. The stream is instantiated with credentials and optionally filters supplied by the * command line arguments. + * */ -object TwitterBasic { +object TwitterPopularTags { def main(args: Array[String]) { if (args.length < 3) { - System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" + + System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" + " [filter1] [filter2] ... [filter n]") System.exit(1) } @@ -21,10 +21,8 @@ object TwitterBasic { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) - val stream = new TwitterInputDStream(ssc, username, password, filters, - StorageLevel.MEMORY_ONLY_SER) - ssc.registerInputStream(stream) + val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2)) + val stream = ssc.twitterStream(username, password, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) @@ -39,22 +37,17 @@ object TwitterBasic { // Print popular hashtags topCounts60.foreach(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(5) - println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - } + val topList = rdd.take(5) + println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) topCounts10.foreach(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(5) - println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - } + val topList = rdd.take(5) + println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) ssc.start() } - } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index af8b5ba017..c6d3cc8b15 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -154,10 +154,7 @@ object SparkBuild extends Build { ) def examplesSettings = sharedSettings ++ Seq( - name := "spark-examples", - libraryDependencies ++= Seq( - "org.twitter4j" % "twitter4j-stream" % "3.0.3" - ) + name := "spark-examples" ) def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") @@ -166,7 +163,8 @@ object SparkBuild extends Build { name := "spark-streaming", libraryDependencies ++= Seq( "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", - "com.github.sgroschupf" % "zkclient" % "0.1" + "com.github.sgroschupf" % "zkclient" % "0.1", + "org.twitter4j" % "twitter4j-stream" % "3.0.3" ) ) ++ assemblySettings ++ extraAssemblySettings diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 8cfbec51d2..9be9d884be 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -17,6 +17,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import java.util.UUID +import twitter4j.Status /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -30,14 +31,14 @@ class StreamingContext private ( ) extends Logging { /** - * Creates a StreamingContext using an existing SparkContext. + * Create a StreamingContext using an existing SparkContext. * @param sparkContext Existing SparkContext * @param batchDuration The time interval at which streaming data will be divided into batches */ def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration) /** - * Creates a StreamingContext by providing the details necessary for creating a new SparkContext. + * Create a StreamingContext by providing the details necessary for creating a new SparkContext. * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). * @param frameworkName A name for your job, to display on the cluster web UI * @param batchDuration The time interval at which streaming data will be divided into batches @@ -46,7 +47,7 @@ class StreamingContext private ( this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) /** - * Re-creates a StreamingContext from a checkpoint file. + * Re-create a StreamingContext from a checkpoint file. * @param path Path either to the directory that was specified as the checkpoint directory, or * to the checkpoint file 'graph' or 'graph.bk'. */ @@ -101,12 +102,12 @@ class StreamingContext private ( protected[streaming] var scheduler: Scheduler = null /** - * Returns the associated Spark context + * Return the associated Spark context */ def sparkContext = sc /** - * Sets each DStreams in this context to remember RDDs it generated in the last given duration. + * Set each DStreams in this context to remember RDDs it generated in the last given duration. * DStreams remember RDDs only for a limited duration of time and releases them for garbage * collection. This method allows the developer to specify how to long to remember the RDDs ( * if the developer wishes to query old data outside the DStream computation). @@ -117,7 +118,7 @@ class StreamingContext private ( } /** - * Sets the context to periodically checkpoint the DStream operations for master + * Set the context to periodically checkpoint the DStream operations for master * fault-tolerance. By default, the graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored * @param interval checkpoint interval @@ -200,7 +201,7 @@ class StreamingContext private ( } /** - * Creates a input stream from a Flume source. + * Create a input stream from a Flume source. * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent * @param storageLevel Storage level to use for storing the received objects @@ -236,7 +237,7 @@ class StreamingContext private ( } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. * File names starting with . are ignored. * @param directory HDFS directory to monitor for new file @@ -255,7 +256,7 @@ class StreamingContext private ( } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. * @param directory HDFS directory to monitor for new file * @param filter Function to filter paths to process @@ -274,9 +275,8 @@ class StreamingContext private ( inputStream } - /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value * as Text and input format as TextInputFormat). File names starting with . are ignored. * @param directory HDFS directory to monitor for new file @@ -286,7 +286,25 @@ class StreamingContext private ( } /** - * Creates an input stream from a queue of RDDs. In each batch, + * Create a input stream that returns tweets received from Twitter. + * @param username Twitter username + * @param password Twitter password + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + username: String, + password: String, + filters: Seq[String], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval @@ -300,7 +318,7 @@ class StreamingContext private ( } /** - * Creates an input stream from a queue of RDDs. In each batch, + * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval @@ -325,7 +343,7 @@ class StreamingContext private ( } /** - * Registers an input stream that will be started (InputDStream.start() called) to get the + * Register an input stream that will be started (InputDStream.start() called) to get the * input data. */ def registerInputStream(inputStream: InputDStream[_]) { @@ -333,7 +351,7 @@ class StreamingContext private ( } /** - * Registers an output stream that will be computed every interval + * Register an output stream that will be computed every interval */ def registerOutputStream(outputStream: DStream[_]) { graph.addOutputStream(outputStream) @@ -351,7 +369,7 @@ class StreamingContext private ( } /** - * Starts the execution of the streams. + * Start the execution of the streams. */ def start() { if (checkpointDir != null && checkpointDuration == null && graph != null) { @@ -379,7 +397,7 @@ class StreamingContext private ( } /** - * Stops the execution of the streams. + * Stop the execution of the streams. */ def stop() { try { diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala similarity index 94% rename from examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala rename to streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index 99ed4cdc1c..d733254ddb 100644 --- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -1,4 +1,4 @@ -package spark.streaming.examples.twitter +package spark.streaming.dstream import spark._ import spark.streaming._ @@ -6,7 +6,6 @@ import dstream.{NetworkReceiver, NetworkInputDStream} import storage.StorageLevel import twitter4j._ import twitter4j.auth.BasicAuthorization -import collection.JavaConversions._ /* A stream of Twitter statuses, potentially filtered by one or more keywords. * @@ -50,7 +49,7 @@ class TwitterReceiver( def onTrackLimitationNotice(i: Int) {} def onScrubGeo(l: Long, l1: Long) {} def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) {} + def onException(e: Exception) { stopOnError(e) } }) val query: FilterQuery = new FilterQuery -- GitLab