diff --git a/examples/pom.xml b/examples/pom.xml index b8c020a3211dd6fcb5bcc86b99f6c700301d4798..b97e6af288f34f11740aefed90f63af5f44c8b08 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -32,13 +32,20 @@ <url>http://spark.incubator.apache.org/</url> <repositories> - <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 --> <repository> - <id>lib</id> - <url>file://${project.basedir}/lib</url> + <id>apache-repo</id> + <name>Apache Repository</name> + <url>https://repository.apache.org/content/repositories/releases</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> </repository> </repositories> + <dependencies> <dependency> <groupId>org.apache.spark</groupId> @@ -81,9 +88,18 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka</artifactId> - <version>0.7.2-spark</version> <!-- Comes from our in-project repository --> - <scope>provided</scope> + <artifactId>kafka_2.9.2</artifactId> + <version>0.8.0-beta1</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java new file mode 100644 index 0000000000000000000000000000000000000000..9a8e4209eddc7da8d87401dafe81b46f3456065a --- /dev/null +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.examples; + +import java.util.Map; +import java.util.HashMap; + +import com.google.common.collect.Lists; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import scala.Tuple2; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: JavaKafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <zkQuorum> is a list of one or more zookeeper servers that make quorum + * <group> is the name of kafka consumer group + * <topics> is a list of one or more kafka topics to consume from + * <numThreads> is the number of threads the kafka consumer should use + * + * Example: + * `./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02, + * zoo03 my-consumer-group topic1,topic2 1` + */ + +public class JavaKafkaWordCount { + public static void main(String[] args) { + if (args.length < 5) { + System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>"); + System.exit(1); + } + + // Create the context with a 1 second batch size + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); + + int numThreads = Integer.parseInt(args[4]); + Map<String, Integer> topicMap = new HashMap<String, Integer>(); + String[] topics = args[3].split(","); + for (String topic: topics) { + topicMap.put(topic, numThreads); + } + + JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap); + + JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { + @Override + public String call(Tuple2<String, String> tuple2) throws Exception { + return tuple2._2(); + } + }); + + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(x.split(" ")); + } + }); + + JavaPairDStream<String, Integer> wordCounts = words.map( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); + + wordCounts.print(); + ssc.start(); + } +} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 12f939d5a7e4b529c05c8dfe77d67e8c3f254b40..570ba4c81a1d2c3cc00144727f04bc8a45dc071f 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -18,13 +18,11 @@ package org.apache.spark.streaming.examples import java.util.Properties -import kafka.message.Message -import kafka.producer.SyncProducerConfig + import kafka.producer._ -import org.apache.spark.SparkContext + import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.util.RawTextHelper._ /** @@ -54,9 +52,10 @@ object KafkaWordCount { ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream(zkQuorum, group, topicpMap) + val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) + val wordCounts = words.map(x => (x, 1l)) + .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() @@ -68,15 +67,16 @@ object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>") + System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + + "<messagesPerSec> <wordsPerMessage>") System.exit(1) } - val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args + val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeper connection properties val props = new Properties() - props.put("zk.connect", zkQuorum) + props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val config = new ProducerConfig(props) @@ -85,11 +85,13 @@ object KafkaWordCountProducer { // Send some messages while(true) { val messages = (1 to messagesPerSec.toInt).map { messageNum => - (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ") + val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) + .mkString(" ") + + new KeyedMessage[String, String](topic, str) }.toArray - println(messages.mkString(",")) - val data = new ProducerData[String, String](topic, messages) - producer.send(data) + + producer.send(messages: _*) Thread.sleep(100) } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 973f1e2f11d8d40d9e8f3bb8936124d5e5b1639f..b14970942b39bd95206ee96cff178f8675d2ab03 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -273,13 +273,16 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", resolvers ++= Seq( - "Akka Repository" at "http://repo.akka.io/releases/" + "Akka Repository" at "http://repo.akka.io/releases/", + "Apache repo" at "https://repository.apache.org/content/repositories/releases" ), libraryDependencies ++= Seq( "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), - "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), - "com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty) + "com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty), + "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" + exclude("com.sun.jdmk", "jmxtools") + exclude("com.sun.jmx", "jmxri") ) ) diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar deleted file mode 100644 index 65f79925a4d06a41b7b98d7e6f92fedb408c9b3a..0000000000000000000000000000000000000000 Binary files a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar and /dev/null differ diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 deleted file mode 100644 index 29f45f4adb6975e36cb95ee2f50de53ef76b0c5b..0000000000000000000000000000000000000000 --- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -18876b8bc2e4cef28b6d191aa49d963f \ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 deleted file mode 100644 index e3bd62bac038f6e56f7bbaf554a544f441dfbbaa..0000000000000000000000000000000000000000 --- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -06b27270ffa52250a2c08703b397c99127b72060 \ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom deleted file mode 100644 index 082d35726a5afe5edb882bc4caac7bc6a7aafdc0..0000000000000000000000000000000000000000 --- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom +++ /dev/null @@ -1,9 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <modelVersion>4.0.0</modelVersion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka</artifactId> - <version>0.7.2-spark</version> - <description>POM was created from install:install-file</description> -</project> diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 deleted file mode 100644 index 92c4132b5b01c48b0e17458876a7beeeed3e3084..0000000000000000000000000000000000000000 --- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -7bc4322266e6032bdf9ef6eebdd8097d \ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 deleted file mode 100644 index 8a1d8a097a113cf674588322a978235f73c0d7f4..0000000000000000000000000000000000000000 --- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -d0f79e8eff0db43ca7bcf7dce2c8cd2972685c9d \ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml deleted file mode 100644 index 720cd51c2f5e6408a63eae37c92f52210de58d55..0000000000000000000000000000000000000000 --- a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml +++ /dev/null @@ -1,12 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<metadata> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka</artifactId> - <versioning> - <release>0.7.2-spark</release> - <versions> - <version>0.7.2-spark</version> - </versions> - <lastUpdated>20130121015225</lastUpdated> - </versioning> -</metadata> diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 deleted file mode 100644 index a4ce5dc9e8d9b21320106941f0f8b36c3e2b6485..0000000000000000000000000000000000000000 --- a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 +++ /dev/null @@ -1 +0,0 @@ -e2b9c7c5f6370dd1d21a0aae5e8dcd77 \ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 deleted file mode 100644 index b869eaf2a61de2fbaa3f75c2e6c9df874ac92a5e..0000000000000000000000000000000000000000 --- a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 +++ /dev/null @@ -1 +0,0 @@ -2a4341da936b6c07a09383d17ffb185ac558ee91 \ No newline at end of file diff --git a/streaming/pom.xml b/streaming/pom.xml index 3b25fb49fbedb694b699ee0195257b7fc51d2d5c..14c043175d0ee129c504af34fc90013b7568e00b 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -32,10 +32,16 @@ <url>http://spark.incubator.apache.org/</url> <repositories> - <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 --> <repository> - <id>lib</id> - <url>file://${project.basedir}/lib</url> + <id>apache-repo</id> + <name>Apache Repository</name> + <url>https://repository.apache.org/content/repositories/releases</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> </repository> </repositories> @@ -56,9 +62,18 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka</artifactId> - <version>0.7.2-spark</version> <!-- Comes from our in-project repository --> - <scope>provided</scope> + <artifactId>kafka_2.9.2</artifactId> + <version>0.8.0-beta1</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.flume</groupId> @@ -71,11 +86,6 @@ </exclusion> </exclusions> </dependency> - <dependency> - <groupId>com.github.sgroschupf</groupId> - <artifactId>zkclient</artifactId> - <version>0.1</version> - </dependency> <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-stream</artifactId> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 878725c705db71807c567c909b1b6ab0f48f7f94..dc60046805fd0ba5d99f40d9cf6516ceae7e9c86 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -252,10 +252,14 @@ class StreamingContext private ( groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 - ): DStream[String] = { + ): DStream[(String, String)] = { val kafkaParams = Map[String, String]( - "zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000") - kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel) + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "10000") + kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( + kafkaParams, + topics, + storageLevel) } /** @@ -266,12 +270,16 @@ class StreamingContext private ( * in its own thread. * @param storageLevel Storage level to use for storing the received objects */ - def kafkaStream[T: ClassManifest, D <: kafka.serializer.Decoder[_]: Manifest]( + def kafkaStream[ + K: ClassManifest, + V: ClassManifest, + U <: kafka.serializer.Decoder[_]: Manifest, + T <: kafka.serializer.Decoder[_]: Manifest]( kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ): DStream[T] = { - val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel) + ): DStream[(K, V)] = { + val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel) registerInputStream(inputStream) inputStream } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 54ba3e602590f806906f75f45ec6df22bde66da3..6423b916b0ac48abaceafa7a874c8d1ff5295f6a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -141,7 +141,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { zkQuorum: String, groupId: String, topics: JMap[String, JInt]) - : JavaDStream[String] = { + : JavaPairDStream[String, String] = { implicit val cmt: ClassManifest[String] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), @@ -162,7 +162,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { groupId: String, topics: JMap[String, JInt], storageLevel: StorageLevel) - : JavaDStream[String] = { + : JavaPairDStream[String, String] = { implicit val cmt: ClassManifest[String] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), @@ -171,25 +171,34 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create an input stream that pulls messages form a Kafka Broker. - * @param typeClass Type of RDD - * @param decoderClass Type of kafka decoder + * @param keyTypeClass Key type of RDD + * @param valueTypeClass value type of RDD + * @param keyDecoderClass Type of kafka key decoder + * @param valueDecoderClass Type of kafka value decoder * @param kafkaParams Map of kafka configuration paramaters. * See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param storageLevel RDD storage level. Defaults to memory-only */ - def kafkaStream[T, D <: kafka.serializer.Decoder[_]]( - typeClass: Class[T], - decoderClass: Class[D], + def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]]( + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], kafkaParams: JMap[String, String], topics: JMap[String, JInt], storageLevel: StorageLevel) - : JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]] - ssc.kafkaStream[T, D]( + : JavaPairDStream[K, V] = { + implicit val keyCmt: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val valueCmt: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + + implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] + implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] + + ssc.kafkaStream[K, V, U, T]( kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala index 51e913675d24acbfd04534e6d94ed2edc2cdc4af..a5de5e1fb549c9e5ee6c75adb203200ffbdea09a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala @@ -19,22 +19,18 @@ package org.apache.spark.streaming.dstream import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Time, DStreamCheckpointData, StreamingContext} +import org.apache.spark.streaming.StreamingContext import java.util.Properties import java.util.concurrent.Executors import kafka.consumer._ -import kafka.message.{Message, MessageSet, MessageAndMetadata} import kafka.serializer.Decoder -import kafka.utils.{Utils, ZKGroupTopicDirs} -import kafka.utils.ZkUtils._ +import kafka.utils.VerifiableProperties import kafka.utils.ZKStringSerializer import org.I0Itec.zkclient._ import scala.collection.Map -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ /** @@ -46,25 +42,32 @@ import scala.collection.JavaConversions._ * @param storageLevel RDD storage level. */ private[streaming] -class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest]( +class KafkaInputDStream[ + K: ClassManifest, + V: ClassManifest, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_ ) with Logging { + ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[T] = { - new KafkaReceiver[T, D](kafkaParams, topics, storageLevel) - .asInstanceOf[NetworkReceiver[T]] + def getReceiver(): NetworkReceiver[(K, V)] = { + new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) + .asInstanceOf[NetworkReceiver[(K, V)]] } } private[streaming] -class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel +class KafkaReceiver[ + K: ClassManifest, + V: ClassManifest, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel ) extends NetworkReceiver[Any] { // Handles pushing data into the BlockManager @@ -83,27 +86,34 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest]( // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid")) + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) // Kafka connection properties val props = new Properties() kafkaParams.foreach(param => props.put(param._1, param._2)) // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect")) + logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect")) val consumerConfig = new ConsumerConfig(props) consumerConnector = Consumer.create(consumerConfig) - logInfo("Connected to " + kafkaParams("zk.connect")) + logInfo("Connected to " + kafkaParams("zookeeper.connect")) // When autooffset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. - if (kafkaParams.contains("autooffset.reset")) { - tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) + if (kafkaParams.contains("auto.offset.reset")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) } // Create Threads for each Topic/Message Stream we are listening - val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]] - val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder) + val keyDecoder = manifest[U].erasure.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] + val valueDecoder = manifest[T].erasure.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => @@ -112,11 +122,12 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest]( } // Handles Kafka Messages - private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable { + private class MessageHandler[K: ClassManifest, V: ClassManifest](stream: KafkaStream[K, V]) + extends Runnable { def run() { logInfo("Starting MessageHandler.") for (msgAndMetadata <- stream) { - blockGenerator += msgAndMetadata.message + blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) } } } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index c0d729ff8767373549991e872906a2b63d5fd05d..dc01f1e8aa0ca3ee0935378462c2fbe85bd20c85 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1220,14 +1220,20 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, + JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK()); HashMap<String, String> kafkaParams = Maps.newHashMap(); - kafkaParams.put("zk.connect","localhost:12345"); - kafkaParams.put("groupid","consumer-group"); - JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, + kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("group.id","consumer-group"); + JavaPairDStream<String, String> test3 = ssc.kafkaStream( + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topics, StorageLevel.MEMORY_AND_DISK()); } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 42e3e51e3fa15b18de981fe46f40fe999786a276..c29b75ece69f05211be5dc977b0f2e860282cca0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -268,8 +268,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK) // Test specifying decoder - val kafkaParams = Map("zk.connect"->"localhost:12345","groupid"->"consumer-group") - val test3 = ssc.kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) + val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") + val test3 = ssc.kafkaStream[ + String, + String, + kafka.serializer.StringDecoder, + kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) } }