Skip to content
Snippets Groups Projects
Commit c6226334 authored by cody koeninger's avatar cody koeninger Committed by Tathagata Das
Browse files

[SPARK-16212][STREAMING][KAFKA] code cleanup from review feedback

## What changes were proposed in this pull request?
code cleanup in kafka-0-8 to match suggested changes for kafka-0-10 branch

## How was this patch tested?
unit tests

Author: cody koeninger <cody@koeninger.org>

Closes #13908 from koeninger/kafka-0-8-cleanup.
parent 46395db8
No related branches found
No related tags found
No related merge requests found
...@@ -34,7 +34,7 @@ import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} ...@@ -34,7 +34,7 @@ import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator import org.apache.spark.streaming.scheduler.rate.RateEstimator
/** /**
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where * A stream of [[KafkaRDD]] where
* each given Kafka topic/partition corresponds to an RDD partition. * each given Kafka topic/partition corresponds to an RDD partition.
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages * of messages
...@@ -43,7 +43,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator ...@@ -43,7 +43,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
* and this DStream is not responsible for committing offsets, * and this DStream is not responsible for committing offsets,
* so that you can control exactly-once semantics. * so that you can control exactly-once semantics.
* For an easy interface to Kafka-managed offsets, * For an easy interface to Kafka-managed offsets,
* see {@link org.apache.spark.streaming.kafka.KafkaCluster} * see [[KafkaCluster]]
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>. * configuration parameters</a>.
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
...@@ -132,7 +132,7 @@ class DirectKafkaInputDStream[ ...@@ -132,7 +132,7 @@ class DirectKafkaInputDStream[
if (retries <= 0) { if (retries <= 0) {
throw new SparkException(err) throw new SparkException(err)
} else { } else {
log.error(err) logError(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs) Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1) latestLeaderOffsets(retries - 1)
} }
...@@ -194,7 +194,7 @@ class DirectKafkaInputDStream[ ...@@ -194,7 +194,7 @@ class DirectKafkaInputDStream[
data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
} }
override def update(time: Time) { override def update(time: Time): Unit = {
batchForTime.clear() batchForTime.clear()
generatedRDDs.foreach { kv => generatedRDDs.foreach { kv =>
val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
...@@ -202,9 +202,9 @@ class DirectKafkaInputDStream[ ...@@ -202,9 +202,9 @@ class DirectKafkaInputDStream[
} }
} }
override def cleanup(time: Time) { } override def cleanup(time: Time): Unit = { }
override def restore() { override def restore(): Unit = {
// this is assuming that the topics don't change during execution, which is true currently // this is assuming that the topics don't change during execution, which is true currently
val topics = fromOffsets.keySet val topics = fromOffsets.keySet
val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics)) val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
......
...@@ -129,7 +129,7 @@ class KafkaRDD[ ...@@ -129,7 +129,7 @@ class KafkaRDD[
val part = thePart.asInstanceOf[KafkaRDDPartition] val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) { if (part.fromOffset == part.untilOffset) {
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " + logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}") s"skipping ${part.topic} ${part.partition}")
Iterator.empty Iterator.empty
} else { } else {
...@@ -137,13 +137,16 @@ class KafkaRDD[ ...@@ -137,13 +137,16 @@ class KafkaRDD[
} }
} }
/**
* An iterator that fetches messages directly from Kafka for the offsets in partition.
*/
private class KafkaRDDIterator( private class KafkaRDDIterator(
part: KafkaRDDPartition, part: KafkaRDDPartition,
context: TaskContext) extends NextIterator[R] { context: TaskContext) extends NextIterator[R] {
context.addTaskCompletionListener{ context => closeIfNeeded() } context.addTaskCompletionListener{ context => closeIfNeeded() }
log.info(s"Computing topic ${part.topic}, partition ${part.partition} " + logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
s"offsets ${part.fromOffset} -> ${part.untilOffset}") s"offsets ${part.fromOffset} -> ${part.untilOffset}")
val kc = new KafkaCluster(kafkaParams) val kc = new KafkaCluster(kafkaParams)
...@@ -177,7 +180,7 @@ class KafkaRDD[ ...@@ -177,7 +180,7 @@ class KafkaRDD[
val err = resp.errorCode(part.topic, part.partition) val err = resp.errorCode(part.topic, part.partition)
if (err == ErrorMapping.LeaderNotAvailableCode || if (err == ErrorMapping.LeaderNotAvailableCode ||
err == ErrorMapping.NotLeaderForPartitionCode) { err == ErrorMapping.NotLeaderForPartitionCode) {
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " + logError(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms") s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
Thread.sleep(kc.config.refreshLeaderBackoffMs) Thread.sleep(kc.config.refreshLeaderBackoffMs)
} }
......
...@@ -135,11 +135,6 @@ public class JavaDirectKafkaStreamSuite implements Serializable { ...@@ -135,11 +135,6 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
@Override @Override
public void call(JavaRDD<String> rdd) { public void call(JavaRDD<String> rdd) {
result.addAll(rdd.collect()); result.addAll(rdd.collect());
for (OffsetRange o : offsetRanges.get()) {
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
);
}
} }
} }
); );
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment