diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6ef2ac477a4f2e34d838d5f126f398c02de964db..618c7afc37c2c252af42a7ce1b1492da7b53242c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -115,7 +115,8 @@ object SparkBuild extends Build { "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", "Spray Repository" at "http://repo.spray.cc/", - "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/" + "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/", + "Twitter4J Repository" at "http://twitter4j.org/maven2/" ), libraryDependencies ++= Seq( @@ -133,6 +134,8 @@ object SparkBuild extends Build { "com.typesafe.akka" % "akka-slf4j" % "2.0.3", "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", + "org.twitter4j" % "twitter4j-core" % "3.0.2", + "org.twitter4j" % "twitter4j-stream" % "3.0.2", "cc.spray" % "spray-can" % "1.0-M2.1", "cc.spray" % "spray-server" % "1.0-M2.1", "org.apache.mesos" % "mesos" % "0.9.0-incubating" diff --git a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala new file mode 100644 index 0000000000000000000000000000000000000000..5d177e96dec5a80ef6fb84885f64ab456c409914 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala @@ -0,0 +1,59 @@ +package spark.streaming + +import spark.RDD +import spark.streaming.{Time, InputDStream} +import twitter4j._ +import twitter4j.auth.BasicAuthorization +import collection.mutable.ArrayBuffer +import collection.JavaConversions._ + +/* A stream of Twitter statuses, potentially filtered by one or more keywords. +* +* @constructor create a new Twitter stream using the supplied username and password to authenticate. +* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is +* such that this may return a sampled subset of all tweets during each interval. +*/ +class TwitterInputDStream( + @transient ssc_ : StreamingContext, + username: String, + password: String, + filters: Seq[String] + ) extends InputDStream[Status](ssc_) { + val statuses: ArrayBuffer[Status] = ArrayBuffer() + var twitterStream: TwitterStream = _ + + override def start() = { + twitterStream = new TwitterStreamFactory() + .getInstance(new BasicAuthorization(username, password)) + twitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + statuses += status + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) {} + }) + + val query: FilterQuery = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + twitterStream.filter(query) + } else { + twitterStream.sample() + } + } + + override def stop() = { + twitterStream.shutdown() + } + + override def compute(validTime: Time): Option[RDD[Status]] = { + // Flush the current tweet buffer + val rdd = Some(ssc.sc.parallelize(statuses)) + statuses.foreach(x => statuses -= x) + rdd + } +} diff --git a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala new file mode 100644 index 0000000000000000000000000000000000000000..c7e380fbe11818194c133957cd53ed4a172d392d --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala @@ -0,0 +1,37 @@ +package spark.streaming.examples + +import spark.streaming.StreamingContext._ +import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext} + +object TwitterBasic { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>") + System.exit(1) + } + + val Array(master, username, password) = args + + val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) + val stream = new TwitterInputDStream(ssc, username, password, Seq()) + ssc.graph.addInputStream(stream) + + val hashTags = stream.flatMap( + status => status.getText.split(" ").filter(_.startsWith("#"))) + + // Word count over hashtags + val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) + + // TODO: Sorts on one node - should do with global sorting once streaming supports it + val topCounts = counts.collect().map(_.sortBy(-_._2).take(5)) + + topCounts.foreachRDD(rdd => { + val topList = rdd.take(1)(0) + println("\nPopular topics in last 60 seconds:") + topList.foreach(t => println("%s (%s tweets)".format(t._1, t._2))) + } + ) + + ssc.start() + } +}