diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 92c296a9e6bd355341c6258fe363703dd51acc23..386066a85749f407e0a19e930f9c25958c21d9e0 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -91,7 +91,9 @@ The new Kafka consumer API will pre-fetch messages into buffers. Therefore it i In most cases, you should use `LocationStrategies.PreferConsistent` as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use `PreferBrokers`, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location). -The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity` +The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`. + +If you would like to disable the caching for Kafka consumers, you can set `spark.streaming.kafka.consumer.cache.enabled` to `false`. Disabling the cache may be needed to workaround the problem described in SPARK-19185. This property may be removed in later versions of Spark, once SPARK-19185 is resolved. The cache is keyed by topicpartition and group.id, so use a **separate** `group.id` for each call to `createDirectStream`. diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 6d6983c4bd419a732c69c85985f58fbe8bee7a8b..9a4a1cf32a480de7576c4a4926ac859b05098c11 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -213,8 +213,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( val fo = currentOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo) } - val rdd = new KafkaRDD[K, V]( - context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true) + val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled", + true) + val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, + getPreferredHosts, useConsumerCache) // Report the record number and metadata of this batch interval to InputInfoTracker. val description = offsetRanges.filter { offsetRange => @@ -316,7 +318,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( b.map(OffsetRange(_)), getPreferredHosts, // during restore, it's possible same partition will be consumed from multiple - // threads, so dont use cache + // threads, so do not use cache. false ) }