diff --git a/examples/pom.xml b/examples/pom.xml index 28da3dbde4a1b83be0b380f1adfd55c26d5024d0..7d975875fac3afe8e733c3549bcc4437d0f236f6 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -19,17 +19,11 @@ <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> </dependency> - <dependency> - <groupId>org.twitter4j</groupId> - <artifactId>twitter4j-stream</artifactId> - <version>3.0.3</version> - </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>algebird-core_2.9.2</artifactId> <version>0.1.8</version> </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala similarity index 64% rename from examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala rename to examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala index 023a0add804d20c01bf00dc0bf34a3b5a8a55fc5..c2095f5b9459b0a6cfa5a421a0b872d3f3cf0f97 100644 --- a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -1,4 +1,4 @@ -package spark.streaming.examples.twitter +package spark.streaming.examples import spark.streaming.{Seconds, StreamingContext} import spark.storage.StorageLevel @@ -7,44 +7,43 @@ import com.twitter.algebird.HyperLogLogMonoid import spark.streaming.dstream.TwitterInputDStream /** - * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's - * TwitterInputDStream + * Example using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream to compute approximate distinct counts of userids. */ -object StreamingHLL { +object TwitterAlgebirdHLL { def main(args: Array[String]) { if (args.length < 3) { - System.err.println("Usage: TwitterStreamingHLL <master> <twitter_username> <twitter_password>" + + System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" + " [filter1] [filter2] ... [filter n]") System.exit(1) } + /** Bit size parameter for HyperLogLog */ + val BIT_SIZE = 12 val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2)) - val stream = new TwitterInputDStream(ssc, username, password, filters, - StorageLevel.MEMORY_ONLY_SER) - ssc.registerInputStream(stream) + val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5)) + val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) - val globalHll = new HyperLogLogMonoid(12) + var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero var userSet: Set[Long] = Set() val approxUsers = users.mapPartitions(ids => { - val hll = new HyperLogLogMonoid(12) + val hll = new HyperLogLogMonoid(BIT_SIZE) ids.map(id => hll(id)) }).reduce(_ + _) val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - var h = globalHll.zero approxUsers.foreach(rdd => { if (rdd.count() != 0) { val partial = rdd.first() - h += partial + globalHll += partial println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) - println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) } }) @@ -54,7 +53,7 @@ object StreamingHLL { userSet ++= partial println("Exact distinct users this batch: %d".format(partial.size)) println("Exact distinct users overall: %d".format(userSet.size)) - println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100)) + println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100)) } }) diff --git a/streaming/pom.xml b/streaming/pom.xml index 6ee7e59df39d16b1bf47ad23604bfe7f80f40f53..d78c39da0d1a60a1bdbf928c3981392f1bbc11c1 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -47,6 +47,16 @@ <artifactId>zkclient</artifactId> <version>0.1</version> </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + <version>3.0.3</version> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-core</artifactId> + <version>3.0.3</version> + </dependency> <dependency> <groupId>org.scalatest</groupId>