From dedbceec1ef33ccd88101016de969a1ef3e3e142 Mon Sep 17 00:00:00 2001
From: cody koeninger <cody@koeninger.org>
Date: Wed, 29 Jun 2016 23:21:03 -0700
Subject: [PATCH] [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new
 Kafka 0.10 Consumer API

## What changes were proposed in this pull request?

New Kafka consumer api for the released 0.10 version of Kafka

## How was this patch tested?

Unit tests, manual tests

Author: cody koeninger <cody@koeninger.org>

Closes #11863 from koeninger/kafka-0.9.
---
 external/kafka-0-10-assembly/pom.xml          | 176 +++++
 external/kafka-0-10/pom.xml                   |  98 +++
 .../kafka010/CachedKafkaConsumer.scala        | 189 ++++++
 .../streaming/kafka010/ConsumerStrategy.scala | 314 +++++++++
 .../kafka010/DirectKafkaInputDStream.scala    | 318 +++++++++
 .../spark/streaming/kafka010/KafkaRDD.scala   | 232 +++++++
 .../kafka010/KafkaRDDPartition.scala          |  45 ++
 .../streaming/kafka010/KafkaTestUtils.scala   | 277 ++++++++
 .../spark/streaming/kafka010/KafkaUtils.scala | 175 +++++
 .../streaming/kafka010/LocationStrategy.scala |  77 +++
 .../streaming/kafka010/OffsetRange.scala      | 153 +++++
 .../streaming/kafka010/package-info.java      |  21 +
 .../spark/streaming/kafka010/package.scala    |  23 +
 .../kafka010/JavaConsumerStrategySuite.java   |  84 +++
 .../kafka010/JavaDirectKafkaStreamSuite.java  | 180 ++++++
 .../streaming/kafka010/JavaKafkaRDDSuite.java | 122 ++++
 .../kafka010/JavaLocationStrategySuite.java   |  58 ++
 .../src/test/resources/log4j.properties       |  28 +
 .../kafka010/DirectKafkaStreamSuite.scala     | 612 ++++++++++++++++++
 .../streaming/kafka010/KafkaRDDSuite.scala    | 169 +++++
 pom.xml                                       |   2 +
 project/SparkBuild.scala                      |  12 +-
 22 files changed, 3359 insertions(+), 6 deletions(-)
 create mode 100644 external/kafka-0-10-assembly/pom.xml
 create mode 100644 external/kafka-0-10/pom.xml
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java
 create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
 create mode 100644 external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 create mode 100644 external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
 create mode 100644 external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
 create mode 100644 external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
 create mode 100644 external/kafka-0-10/src/test/resources/log4j.properties
 create mode 100644 external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 create mode 100644 external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala

diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml
new file mode 100644
index 0000000000..f2468d1cba
--- /dev/null
+++ b/external/kafka-0-10-assembly/pom.xml
@@ -0,0 +1,176 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-kafka-0-10-assembly_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Integration for Kafka 0.10 Assembly</name>
+  <url>http://spark.apache.org/</url>
+
+  <properties>
+    <sbt.project.name>streaming-kafka-0-10-assembly</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <!--
+      Demote already included in the Spark assembly.
+    -->
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.jpountz.lz4</groupId>
+      <artifactId>lz4</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <classifier>${avro.mapred.classifier}</classifier>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.java.dev.jets3t</groupId>
+      <artifactId>jets3t</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+  <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+  <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  <plugins>
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-shade-plugin</artifactId>
+      <configuration>
+        <shadedArtifactAttached>false</shadedArtifactAttached>
+        <artifactSet>
+          <includes>
+            <include>*:*</include>
+          </includes>
+        </artifactSet>
+        <filters>
+          <filter>
+            <artifact>*:*</artifact>
+            <excludes>
+              <exclude>META-INF/*.SF</exclude>
+              <exclude>META-INF/*.DSA</exclude>
+              <exclude>META-INF/*.RSA</exclude>
+            </excludes>
+          </filter>
+        </filters>
+      </configuration>
+      <executions>
+        <execution>
+          <phase>package</phase>
+          <goals>
+            <goal>shade</goal>
+          </goals>
+          <configuration>
+            <transformers>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                <resource>reference.conf</resource>
+              </transformer>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                <resource>log4j.properties</resource>
+              </transformer>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+            </transformers>
+          </configuration>
+        </execution>
+      </executions>
+    </plugin>
+  </plugins>
+</build>
+</project>
+
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
new file mode 100644
index 0000000000..50395f6d14
--- /dev/null
+++ b/external/kafka-0-10/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
+  <properties>
+    <sbt.project.name>streaming-kafka-0-10</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Spark Integration for Kafka 0.10</name>
+  <url>http://spark.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.binary.version}</artifactId>
+      <version>0.10.0.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.sf.jopt-simple</groupId>
+          <artifactId>jopt-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>net.sf.jopt-simple</groupId>
+      <artifactId>jopt-simple</artifactId>
+      <version>3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalacheck</groupId>
+      <artifactId>scalacheck_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
new file mode 100644
index 0000000000..fa3ea6131a
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer }
+import org.apache.kafka.common.{ KafkaException, TopicPartition }
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads is usually bad anyway.
+ */
+private[kafka010]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+    "groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+    val c = new KafkaConsumer[K, V](kafkaParams)
+    val tps = new ju.ArrayList[TopicPartition]()
+    tps.add(topicPartition)
+    c.assign(tps)
+    c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO is necessary.
+   * Sequential forward access will use buffers, but random access will be horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+    logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
+    if (offset != nextOffset) {
+      logInfo(s"Initial fetch for $groupId $topic $partition $offset")
+      seek(offset)
+      poll(timeout)
+    }
+
+    if (!buffer.hasNext()) { poll(timeout) }
+    assert(buffer.hasNext(),
+      s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
+    var record = buffer.next()
+
+    if (record.offset != offset) {
+      logInfo(s"Buffer miss for $groupId $topic $partition $offset")
+      seek(offset)
+      poll(timeout)
+      assert(buffer.hasNext(),
+        s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
+      record = buffer.next()
+      assert(record.offset == offset,
+        s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
+    }
+
+    nextOffset = offset + 1
+    record
+  }
+
+  private def seek(offset: Long): Unit = {
+    logDebug(s"Seeking to $topicPartition $offset")
+    consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+    val p = consumer.poll(timeout)
+    val r = p.records(topicPartition)
+    logDebug(s"Polled ${p.partitions()}  ${r.size}")
+    buffer = r.iterator
+  }
+
+}
+
+private[kafka010]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null
+
+  /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */
+  def init(
+      initialCapacity: Int,
+      maxCapacity: Int,
+      loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
+    if (null == cache) {
+      logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor")
+      cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
+        initialCapacity, loadFactor, true) {
+        override def removeEldestEntry(
+          entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = {
+          if (this.size > maxCapacity) {
+            try {
+              entry.getValue.consumer.close()
+            } catch {
+              case x: KafkaException =>
+                logError("Error closing oldest Kafka consumer", x)
+            }
+            true
+          } else {
+            false
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Get a cached consumer for groupId, assigned to topic and partition.
+   * If matching consumer doesn't already exist, will be created using kafkaParams.
+   */
+  def get[K, V](
+      groupId: String,
+      topic: String,
+      partition: Int,
+      kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
+    CachedKafkaConsumer.synchronized {
+      val k = CacheKey(groupId, topic, partition)
+      val v = cache.get(k)
+      if (null == v) {
+        logInfo(s"Cache miss for $k")
+        logDebug(cache.keySet.toString)
+        val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)
+        cache.put(k, c)
+        c
+      } else {
+        // any given topicpartition should have a consistent key and value type
+        v.asInstanceOf[CachedKafkaConsumer[K, V]]
+      }
+    }
+
+  /**
+   * Get a fresh new instance, unassociated with the global cache.
+   * Caller is responsible for closing
+   */
+  def getUncached[K, V](
+      groupId: String,
+      topic: String,
+      partition: Int,
+      kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
+    new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)
+
+  /** remove consumer for given groupId, topic, and partition, if it exists */
+  def remove(groupId: String, topic: String, partition: Int): Unit = {
+    val k = CacheKey(groupId, topic, partition)
+    logInfo(s"Removing $k from cache")
+    val v = CachedKafkaConsumer.synchronized {
+      cache.remove(k)
+    }
+    if (null != v) {
+      v.close()
+      logInfo(s"Removed $k from cache")
+    }
+  }
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
new file mode 100644
index 0000000000..079a07dbc2
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * Choice of how to create and configure underlying Kafka Consumers on driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup after object
+ *  instantiation. This interface encapsulates that process, and allows it to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver
+   * has successfully read.  Will be empty on initial start, possibly non-empty on restart from
+   * checkpoint.
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * :: Experimental ::
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+@Experimental
+case class Subscribe[K, V] private(
+    topics: ju.Collection[java.lang.String],
+    kafkaParams: ju.Map[String, Object],
+    offsets: ju.Map[TopicPartition, Long]
+  ) extends ConsumerStrategy[K, V] {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
+    val consumer = new KafkaConsumer[K, V](kafkaParams)
+    consumer.subscribe(topics)
+    if (currentOffsets.isEmpty) {
+      offsets.asScala.foreach { case (topicPartition, offset) =>
+          consumer.seek(topicPartition, offset)
+      }
+    }
+
+    consumer
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Companion object for creating [[Subscribe]] strategy
+ */
+@Experimental
+object Subscribe {
+  /**
+   *  :: Experimental ::
+   * Subscribe to a collection of topics.
+   * @param topics collection of topics to subscribe
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
+   * TopicPartition, the committed offset (if applicable) or kafka param
+   * auto.offset.reset will be used.
+   */
+  @Experimental
+  def apply[K, V](
+      topics: Iterable[java.lang.String],
+      kafkaParams: collection.Map[String, Object],
+      offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
+    Subscribe[K, V](
+      new ju.ArrayList(topics.asJavaCollection),
+      new ju.HashMap[String, Object](kafkaParams.asJava),
+      new ju.HashMap[TopicPartition, Long](offsets.asJava))
+  }
+
+  /**
+   *  :: Experimental ::
+   * Subscribe to a collection of topics.
+   * @param topics collection of topics to subscribe
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  @Experimental
+  def apply[K, V](
+      topics: Iterable[java.lang.String],
+      kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
+    Subscribe[K, V](
+      new ju.ArrayList(topics.asJavaCollection),
+      new ju.HashMap[String, Object](kafkaParams.asJava),
+      ju.Collections.emptyMap[TopicPartition, Long]())
+  }
+
+  /**
+   *  :: Experimental ::
+   * Subscribe to a collection of topics.
+   * @param topics collection of topics to subscribe
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
+   * TopicPartition, the committed offset (if applicable) or kafka param
+   * auto.offset.reset will be used.
+   */
+  @Experimental
+  def create[K, V](
+      topics: ju.Collection[java.lang.String],
+      kafkaParams: ju.Map[String, Object],
+      offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
+    Subscribe[K, V](topics, kafkaParams, offsets)
+  }
+
+  /**
+   *  :: Experimental ::
+   * Subscribe to a collection of topics.
+   * @param topics collection of topics to subscribe
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  @Experimental
+  def create[K, V](
+      topics: ju.Collection[java.lang.String],
+      kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
+    Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
+  }
+
+}
+
+/**
+ * :: Experimental ::
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to assign
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+@Experimental
+case class Assign[K, V] private(
+    topicPartitions: ju.Collection[TopicPartition],
+    kafkaParams: ju.Map[String, Object],
+    offsets: ju.Map[TopicPartition, Long]
+  ) extends ConsumerStrategy[K, V] {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
+    val consumer = new KafkaConsumer[K, V](kafkaParams)
+    consumer.assign(topicPartitions)
+    if (currentOffsets.isEmpty) {
+      offsets.asScala.foreach { case (topicPartition, offset) =>
+          consumer.seek(topicPartition, offset)
+      }
+    }
+
+    consumer
+  }
+}
+
+/**
+ *  :: Experimental ::
+ * Companion object for creating [[Assign]] strategy
+ */
+@Experimental
+object Assign {
+  /**
+   *  :: Experimental ::
+   * Assign a fixed collection of TopicPartitions
+   * @param topicPartitions collection of TopicPartitions to assign
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
+   * TopicPartition, the committed offset (if applicable) or kafka param
+   * auto.offset.reset will be used.
+   */
+  @Experimental
+  def apply[K, V](
+      topicPartitions: Iterable[TopicPartition],
+      kafkaParams: collection.Map[String, Object],
+      offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
+    Assign[K, V](
+      new ju.ArrayList(topicPartitions.asJavaCollection),
+      new ju.HashMap[String, Object](kafkaParams.asJava),
+      new ju.HashMap[TopicPartition, Long](offsets.asJava))
+  }
+
+  /**
+   *  :: Experimental ::
+   * Assign a fixed collection of TopicPartitions
+   * @param topicPartitions collection of TopicPartitions to assign
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  @Experimental
+  def apply[K, V](
+      topicPartitions: Iterable[TopicPartition],
+      kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
+    Assign[K, V](
+      new ju.ArrayList(topicPartitions.asJavaCollection),
+      new ju.HashMap[String, Object](kafkaParams.asJava),
+      ju.Collections.emptyMap[TopicPartition, Long]())
+  }
+
+  /**
+   *  :: Experimental ::
+   * Assign a fixed collection of TopicPartitions
+   * @param topicPartitions collection of TopicPartitions to assign
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
+   * TopicPartition, the committed offset (if applicable) or kafka param
+   * auto.offset.reset will be used.
+   */
+  @Experimental
+  def create[K, V](
+      topicPartitions: ju.Collection[TopicPartition],
+      kafkaParams: ju.Map[String, Object],
+      offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
+    Assign[K, V](topicPartitions, kafkaParams, offsets)
+  }
+
+  /**
+   *  :: Experimental ::
+   * Assign a fixed collection of TopicPartitions
+   * @param topicPartitions collection of TopicPartitions to assign
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  @Experimental
+  def create[K, V](
+      topicPartitions: ju.Collection[TopicPartition],
+      kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
+    Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
+  }
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
new file mode 100644
index 0000000000..acd1841d53
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A DStream where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ *   see [[LocationStrategy]] for more details.
+ * @param executorKafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a>.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param consumerStrategy In most cases, pass in [[Subscribe]],
+ *   see [[ConsumerStrategy]] for more details
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class DirectKafkaInputDStream[K, V](
+    _ssc: StreamingContext,
+    locationStrategy: LocationStrategy,
+    consumerStrategy: ConsumerStrategy[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
+
+  val executorKafkaParams = {
+    val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams)
+    KafkaUtils.fixKafkaParams(ekp)
+    ekp
+  }
+
+  protected var currentOffsets = Map[TopicPartition, Long]()
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+    if (null == kc) {
+      kc = consumerStrategy.onStart(currentOffsets)
+    }
+    kc
+  }
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
+    logError("Kafka ConsumerRecord is not serializable. " +
+      "Use .map to extract fields before calling .persist or .window")
+    super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+    val c = consumer
+    val result = new ju.HashMap[TopicPartition, String]()
+    val hosts = new ju.HashMap[TopicPartition, String]()
+    val assignments = c.assignment().iterator()
+    while (assignments.hasNext()) {
+      val tp: TopicPartition = assignments.next()
+      if (null == hosts.get(tp)) {
+        val infos = c.partitionsFor(tp.topic).iterator()
+        while (infos.hasNext()) {
+          val i = infos.next()
+          hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host())
+        }
+      }
+      result.put(tp, hosts.get(tp))
+    }
+    result
+  }
+
+  protected def getPreferredHosts: ju.Map[TopicPartition, String] = {
+    locationStrategy match {
+      case PreferBrokers => getBrokers
+      case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
+      case PreferFixed(hostMap) => hostMap
+    }
+  }
+
+  // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
+  private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]"
+
+  protected[streaming] override val checkpointData =
+    new DirectKafkaInputDStreamCheckpointData
+
+
+  /**
+   * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
+   */
+  override protected[streaming] val rateController: Option[RateController] = {
+    if (RateController.isBackPressureEnabled(ssc.conf)) {
+      Some(new DirectKafkaRateController(id,
+        RateEstimator.create(ssc.conf, context.graph.batchDuration)))
+    } else {
+      None
+    }
+  }
+
+  private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
+    "spark.streaming.kafka.maxRatePerPartition", 0)
+
+  protected[streaming] def maxMessagesPerPartition(
+    offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
+    val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+
+    // calculate a per-partition rate limit based on current lag
+    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
+      case Some(rate) =>
+        val lagPerPartition = offsets.map { case (tp, offset) =>
+          tp -> Math.max(offset - currentOffsets(tp), 0)
+        }
+        val totalLag = lagPerPartition.values.sum
+
+        lagPerPartition.map { case (tp, lag) =>
+          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+          tp -> (if (maxRateLimitPerPartition > 0) {
+            Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
+        }
+      case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
+    }
+
+    if (effectiveRateLimitPerPartition.values.sum > 0) {
+      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
+      Some(effectiveRateLimitPerPartition.map {
+        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+      })
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Returns the latest (highest) available offsets, taking new partitions into account.
+   */
+  protected def latestOffsets(): Map[TopicPartition, Long] = {
+    val c = consumer
+    c.poll(0)
+    val parts = c.assignment().asScala
+
+    // make sure new partitions are reflected in currentOffsets
+    val newPartitions = parts.diff(currentOffsets.keySet)
+    // position for new partitions determined by auto.offset.reset if no commit
+    currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
+    // don't want to consume messages, so pause
+    c.pause(newPartitions.asJava)
+    // find latest available offsets
+    c.seekToEnd(currentOffsets.keySet.asJava)
+    parts.map(tp => tp -> c.position(tp)).toMap
+  }
+
+  // limits the maximum number of messages per partition
+  protected def clamp(
+    offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+
+    maxMessagesPerPartition(offsets).map { mmp =>
+      mmp.map { case (tp, messages) =>
+          val uo = offsets(tp)
+          tp -> Math.min(currentOffsets(tp) + messages, uo)
+      }
+    }.getOrElse(offsets)
+  }
+
+  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
+    val untilOffsets = clamp(latestOffsets())
+    val offsetRanges = untilOffsets.map { case (tp, uo) =>
+      val fo = currentOffsets(tp)
+      OffsetRange(tp.topic, tp.partition, fo, uo)
+    }
+    val rdd = new KafkaRDD[K, V](
+      context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
+
+    // Report the record number and metadata of this batch interval to InputInfoTracker.
+    val description = offsetRanges.filter { offsetRange =>
+      // Don't display empty ranges.
+      offsetRange.fromOffset != offsetRange.untilOffset
+    }.map { offsetRange =>
+      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
+        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+    }.mkString("\n")
+    // Copy offsetRanges to immutable.List to prevent from being modified by the user
+    val metadata = Map(
+      "offsets" -> offsetRanges.toList,
+      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
+    currentOffsets = untilOffsets
+    commitAll()
+    Some(rdd)
+  }
+
+  override def start(): Unit = {
+    val c = consumer
+    c.poll(0)
+    if (currentOffsets.isEmpty) {
+      currentOffsets = c.assignment().asScala.map { tp =>
+        tp -> c.position(tp)
+      }.toMap
+    }
+
+    // don't actually want to consume any messages, so pause all partitions
+    c.pause(currentOffsets.keySet.asJava)
+  }
+
+  override def stop(): Unit = this.synchronized {
+    if (kc != null) {
+      kc.close()
+    }
+  }
+
+  protected val commitQueue = new ConcurrentLinkedQueue[OffsetRange]
+  protected val commitCallback = new AtomicReference[OffsetCommitCallback]
+
+  /**
+   * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
+   * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
+   */
+  def commitAsync(offsetRanges: Array[OffsetRange]): Unit = {
+    commitAsync(offsetRanges, null)
+  }
+
+  /**
+   * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
+   * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
+   * @param callback Only the most recently provided callback will be used at commit.
+   */
+  def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = {
+    commitCallback.set(callback)
+    commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*))
+  }
+
+  protected def commitAll(): Unit = {
+    val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
+    val it = commitQueue.iterator()
+    while (it.hasNext) {
+      val osr = it.next
+      val tp = osr.topicPartition
+      val x = m.get(tp)
+      val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
+      m.put(tp, new OffsetAndMetadata(offset))
+    }
+    if (!m.isEmpty) {
+      consumer.commitAsync(m, commitCallback.get)
+    }
+  }
+
+  private[streaming]
+  class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+    def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+      data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+    }
+
+    override def update(time: Time): Unit = {
+      batchForTime.clear()
+      generatedRDDs.foreach { kv =>
+        val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray
+        batchForTime += kv._1 -> a
+      }
+    }
+
+    override def cleanup(time: Time): Unit = { }
+
+    override def restore(): Unit = {
+      batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+         logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
+         generatedRDDs += t -> new KafkaRDD[K, V](
+           context.sparkContext,
+           executorKafkaParams,
+           b.map(OffsetRange(_)),
+           getPreferredHosts,
+           // during restore, it's possible same partition will be consumed from multiple
+           // threads, so dont use cache
+           false
+         )
+      }
+    }
+  }
+
+  /**
+   * A RateController to retrieve the rate from RateEstimator.
+   */
+  private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
+    extends RateController(id, estimator) {
+    override def publish(rate: Long): Unit = ()
+  }
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
new file mode 100644
index 0000000000..c15c163449
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+ * configuration parameters</a>. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class KafkaRDD[K, V](
+    sc: SparkContext,
+    val kafkaParams: ju.Map[String, Object],
+    val offsetRanges: Array[OffsetRange],
+    val preferredHosts: ju.Map[TopicPartition, String],
+    useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges {
+
+  assert("none" ==
+    kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+      " must be set to none for executor kafka params, else messages may not match offsetRange")
+
+  assert(false ==
+    kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+      " must be set to false for executor kafka params, else offsets may commit before processing")
+
+  // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
+  private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val cacheInitialCapacity =
+    conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+  private val cacheMaxCapacity =
+    conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+  private val cacheLoadFactor =
+    conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+    logError("Kafka ConsumerRecord is not serializable. " +
+      "Use .map to extract fields before calling .persist or .window")
+    super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+    offsetRanges.zipWithIndex.map { case (o, i) =>
+        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
+    }.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def countApprox(
+      timeout: Long,
+      confidence: Double = 0.95
+  ): PartialResult[BoundedDouble] = {
+    val c = count
+    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+  }
+
+  override def isEmpty(): Boolean = count == 0L
+
+  override def take(num: Int): Array[ConsumerRecord[K, V]] = {
+    val nonEmptyPartitions = this.partitions
+      .map(_.asInstanceOf[KafkaRDDPartition])
+      .filter(_.count > 0)
+
+    if (num < 1 || nonEmptyPartitions.isEmpty) {
+      return new Array[ConsumerRecord[K, V]](0)
+    }
+
+    // Determine in advance how many messages need to be taken from each partition
+    val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
+      val remain = num - result.values.sum
+      if (remain > 0) {
+        val taken = Math.min(remain, part.count)
+        result + (part.index -> taken.toInt)
+      } else {
+        result
+      }
+    }
+
+    val buf = new ArrayBuffer[ConsumerRecord[K, V]]
+    val res = context.runJob(
+      this,
+      (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
+      it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
+    )
+    res.foreach(buf ++= _)
+    buf.toArray
+  }
+
+  private def executors(): Array[ExecutorCacheTaskLocation] = {
+    val bm = sparkContext.env.blockManager
+    bm.master.getPeers(bm.blockManagerId).toArray
+      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+      .sortWith(compareExecutors)
+  }
+
+  protected[kafka010] def compareExecutors(
+      a: ExecutorCacheTaskLocation,
+      b: ExecutorCacheTaskLocation): Boolean =
+    if (a.host == b.host) {
+      a.executorId > b.executorId
+    } else {
+      a.host > b.host
+    }
+
+  /**
+   * Non-negative modulus, from java 8 math
+   */
+  private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b
+
+  override def getPreferredLocations(thePart: Partition): Seq[String] = {
+    // The intention is best-effort consistent executor for a given topicpartition,
+    // so that caching consumers can be effective.
+    // TODO what about hosts specified by ip vs name
+    val part = thePart.asInstanceOf[KafkaRDDPartition]
+    val allExecs = executors()
+    val tp = part.topicPartition
+    val prefHost = preferredHosts.get(tp)
+    val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost)
+    val execs = if (prefExecs.isEmpty) allExecs else prefExecs
+    if (execs.isEmpty) {
+      Seq()
+    } else {
+      // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index
+      val index = this.floorMod(tp.hashCode, execs.length)
+      val chosen = execs(index)
+      Seq(chosen.toString)
+    }
+  }
+
+  private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+    s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
+      s"for topic ${part.topic} partition ${part.partition}. " +
+      "You either provided an invalid fromOffset, or the Kafka topic has been damaged"
+
+  override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = {
+    val part = thePart.asInstanceOf[KafkaRDDPartition]
+    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+    if (part.fromOffset == part.untilOffset) {
+      logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
+        s"skipping ${part.topic} ${part.partition}")
+      Iterator.empty
+    } else {
+      new KafkaRDDIterator(part, context)
+    }
+  }
+
+  /**
+   * An iterator that fetches messages directly from Kafka for the offsets in partition.
+   * Uses a cached consumer where possible to take advantage of prefetching
+   */
+  private class KafkaRDDIterator(
+      part: KafkaRDDPartition,
+      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
+
+    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
+      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+
+    val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+    context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+    val consumer = if (useConsumerCache) {
+      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
+      if (context.attemptNumber > 1) {
+        // just in case the prior attempt failures were cache related
+        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
+      }
+      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
+    } else {
+      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
+    }
+
+    var requestOffset = part.fromOffset
+
+    def closeIfNeeded(): Unit = {
+      if (!useConsumerCache && consumer != null) {
+        consumer.close
+      }
+    }
+
+    override def hasNext(): Boolean = requestOffset < part.untilOffset
+
+    override def next(): ConsumerRecord[K, V] = {
+      assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
+      val r = consumer.get(requestOffset, pollTimeout)
+      requestOffset += 1
+      r
+    }
+  }
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala
new file mode 100644
index 0000000000..95569b109f
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.Partition
+
+
+/**
+ * @param topic kafka topic name
+ * @param partition kafka partition id
+ * @param fromOffset inclusive starting offset
+ * @param untilOffset exclusive ending offset
+ */
+private[kafka010]
+class KafkaRDDPartition(
+  val index: Int,
+  val topic: String,
+  val partition: Int,
+  val fromOffset: Long,
+  val untilOffset: Long
+) extends Partition {
+  /** Number of messages this partition refers to */
+  def count(): Long = untilOffset - fromOffset
+
+  /** Kafka TopicPartition object, for convenience */
+  def topicPartition(): TopicPartition = new TopicPartition(topic, partition)
+
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
new file mode 100644
index 0000000000..13c08430db
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.io.File
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.serializer.StringEncoder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.ZkUtils
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
+ */
+private[kafka010] class KafkaTestUtils extends Logging {
+
+  // Zookeeper related configurations
+  private val zkHost = "localhost"
+  private var zkPort: Int = 0
+  private val zkConnectionTimeout = 60000
+  private val zkSessionTimeout = 6000
+
+  private var zookeeper: EmbeddedZookeeper = _
+
+  private var zkUtils: ZkUtils = _
+
+  // Kafka broker related configurations
+  private val brokerHost = "localhost"
+  private var brokerPort = 9092
+  private var brokerConf: KafkaConfig = _
+
+  // Kafka broker server
+  private var server: KafkaServer = _
+
+  // Kafka producer
+  private var producer: Producer[String, String] = _
+
+  // Flag to test whether the system is correctly started
+  private var zkReady = false
+  private var brokerReady = false
+
+  def zkAddress: String = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
+    s"$zkHost:$zkPort"
+  }
+
+  def brokerAddress: String = {
+    assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
+    s"$brokerHost:$brokerPort"
+  }
+
+  def zookeeperClient: ZkUtils = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
+    Option(zkUtils).getOrElse(
+      throw new IllegalStateException("Zookeeper client is not yet initialized"))
+  }
+
+  // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+  private def setupEmbeddedZookeeper(): Unit = {
+    // Zookeeper server startup
+    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+    // Get the actual zookeeper binding port
+    zkPort = zookeeper.actualPort
+    zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
+    zkReady = true
+  }
+
+  // Set up the Embedded Kafka server
+  private def setupEmbeddedKafkaServer(): Unit = {
+    assert(zkReady, "Zookeeper should be set up beforehand")
+
+    // Kafka broker startup
+    Utils.startServiceOnPort(brokerPort, port => {
+      brokerPort = port
+      brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
+      server = new KafkaServer(brokerConf)
+      server.startup()
+      (server, port)
+    }, new SparkConf(), "KafkaBroker")
+
+    brokerReady = true
+  }
+
+  /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
+  def setup(): Unit = {
+    setupEmbeddedZookeeper()
+    setupEmbeddedKafkaServer()
+  }
+
+  /** Teardown the whole servers, including Kafka broker and Zookeeper */
+  def teardown(): Unit = {
+    brokerReady = false
+    zkReady = false
+
+    if (producer != null) {
+      producer.close()
+      producer = null
+    }
+
+    if (server != null) {
+      server.shutdown()
+      server = null
+    }
+
+    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+
+    if (zkUtils != null) {
+      zkUtils.close()
+      zkUtils = null
+    }
+
+    if (zookeeper != null) {
+      zookeeper.shutdown()
+      zookeeper = null
+    }
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+  def createTopic(topic: String, partitions: Int): Unit = {
+    AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+    // wait until metadata is propagated
+    (0 until partitions).foreach { p =>
+      waitUntilMetadataIsPropagated(topic, p)
+    }
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole cluster */
+  def createTopic(topic: String): Unit = {
+    createTopic(topic, 1)
+  }
+
+  /** Java-friendly function for sending messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+    sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+  }
+
+  /** Send the messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+    val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
+    sendMessages(topic, messages)
+  }
+
+  /** Send the array of messages to the Kafka broker */
+  def sendMessages(topic: String, messages: Array[String]): Unit = {
+    producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
+    producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
+    producer.close()
+    producer = null
+  }
+
+  private def brokerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("broker.id", "0")
+    props.put("host.name", "localhost")
+    props.put("port", brokerPort.toString)
+    props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+    props.put("zookeeper.connect", zkAddress)
+    props.put("log.flush.interval.messages", "1")
+    props.put("replica.socket.timeout.ms", "1500")
+    props
+  }
+
+  private def producerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("metadata.broker.list", brokerAddress)
+    props.put("serializer.class", classOf[StringEncoder].getName)
+    // wait for all in-sync replicas to ack sends
+    props.put("request.required.acks", "-1")
+    props
+  }
+
+  // A simplified version of scalatest eventually, rewritten here to avoid adding extra test
+  // dependency
+  def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
+    def makeAttempt(): Either[Throwable, T] = {
+      try {
+        Right(func)
+      } catch {
+        case e if NonFatal(e) => Left(e)
+      }
+    }
+
+    val startTime = System.currentTimeMillis()
+    @tailrec
+    def tryAgain(attempt: Int): T = {
+      makeAttempt() match {
+        case Right(result) => result
+        case Left(e) =>
+          val duration = System.currentTimeMillis() - startTime
+          if (duration < timeout.milliseconds) {
+            Thread.sleep(interval.milliseconds)
+          } else {
+            throw new TimeoutException(e.getMessage)
+          }
+
+          tryAgain(attempt + 1)
+      }
+    }
+
+    tryAgain(1)
+  }
+
+  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
+    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
+      case Some(partitionState) =>
+        val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+        zkUtils.getLeaderForPartition(topic, partition).isDefined &&
+          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+          leaderAndInSyncReplicas.isr.size >= 1
+
+      case _ =>
+        false
+    }
+    eventually(Time(10000), Time(100)) {
+      assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
+    }
+  }
+
+  private class EmbeddedZookeeper(val zkConnect: String) {
+    val snapshotDir = Utils.createTempDir()
+    val logDir = Utils.createTempDir()
+
+    val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+    val (ip, port) = {
+      val splits = zkConnect.split(":")
+      (splits(0), splits(1).toInt)
+    }
+    val factory = new NIOServerCnxnFactory()
+    factory.configure(new InetSocketAddress(ip, port), 16)
+    factory.startup(zookeeper)
+
+    val actualPort = factory.getLocalPort
+
+    def shutdown() {
+      factory.shutdown()
+      Utils.deleteRecursively(snapshotDir)
+      Utils.deleteRecursively(logDir)
+    }
+  }
+}
+
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
new file mode 100644
index 0000000000..c0524990bc
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
+import org.apache.spark.api.java.function.{ Function0 => JFunction0 }
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext }
+import org.apache.spark.streaming.dstream._
+
+/**
+ * :: Experimental ::
+ * Companion object for constructing Kafka streams and RDDs
+ */
+@Experimental
+object KafkaUtils extends Logging {
+  /**
+   * :: Experimental ::
+   * Scala constructor for a batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a>. Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
+   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+   *   see [[LocationStrategy]] for more details.
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   */
+  @Experimental
+  def createRDD[K, V](
+      sc: SparkContext,
+      kafkaParams: ju.Map[String, Object],
+      offsetRanges: Array[OffsetRange],
+      locationStrategy: LocationStrategy
+    ): RDD[ConsumerRecord[K, V]] = {
+    val preferredHosts = locationStrategy match {
+      case PreferBrokers =>
+        throw new AssertionError(
+          "If you want to prefer brokers, you must provide a mapping using PreferFixed " +
+          "A single KafkaRDD does not have a driver consumer and cannot look up brokers for you.")
+      case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
+      case PreferFixed(hostMap) => hostMap
+    }
+    val kp = new ju.HashMap[String, Object](kafkaParams)
+    fixKafkaParams(kp)
+    val osr = offsetRanges.clone()
+
+    new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true)
+  }
+
+  /**
+   * :: Experimental ::
+   * Java constructor for a batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param keyClass Class of the keys in the Kafka records
+   * @param valueClass Class of the values in the Kafka records
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
+   * configuration parameters</a>. Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
+   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+   *   see [[LocationStrategy]] for more details.
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   */
+  @Experimental
+  def createRDD[K, V](
+      jsc: JavaSparkContext,
+      kafkaParams: ju.Map[String, Object],
+      offsetRanges: Array[OffsetRange],
+      locationStrategy: LocationStrategy
+    ): JavaRDD[ConsumerRecord[K, V]] = {
+
+    new JavaRDD(createRDD[K, V](jsc.sc, kafkaParams, offsetRanges, locationStrategy))
+  }
+
+  /**
+   * :: Experimental ::
+   * Scala constructor for a DStream where
+   * each given Kafka topic/partition corresponds to an RDD partition.
+   * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
+   *  of messages
+   * per second that each '''partition''' will accept.
+   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+   *   see [[LocationStrategy]] for more details.
+   * @param consumerStrategy In most cases, pass in [[Subscribe]],
+   *   see [[ConsumerStrategy]] for more details
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   */
+  @Experimental
+  def createDirectStream[K, V](
+      ssc: StreamingContext,
+      locationStrategy: LocationStrategy,
+      consumerStrategy: ConsumerStrategy[K, V]
+    ): InputDStream[ConsumerRecord[K, V]] = {
+    new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy)
+  }
+
+  /**
+   * :: Experimental ::
+   * Java constructor for a DStream where
+   * each given Kafka topic/partition corresponds to an RDD partition.
+   * @param keyClass Class of the keys in the Kafka records
+   * @param valueClass Class of the values in the Kafka records
+   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+   *   see [[LocationStrategy]] for more details.
+   * @param consumerStrategy In most cases, pass in [[Subscribe]],
+   *   see [[ConsumerStrategy]] for more details
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   */
+  @Experimental
+  def createDirectStream[K, V](
+      jssc: JavaStreamingContext,
+      locationStrategy: LocationStrategy,
+      consumerStrategy: ConsumerStrategy[K, V]
+    ): JavaInputDStream[ConsumerRecord[K, V]] = {
+    new JavaInputDStream(
+      createDirectStream[K, V](
+        jssc.ssc, locationStrategy, consumerStrategy))
+  }
+
+  /**
+   * Tweak kafka params to prevent issues on executors
+   */
+  private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
+    logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
+    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
+
+    logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
+    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+
+    // driver and executor should be in different consumer groups
+    val groupId = "spark-executor-" + kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
+    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+
+    // possible workaround for KAFKA-3135
+    val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
+    if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
+      logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")
+      kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
+    }
+  }
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
new file mode 100644
index 0000000000..df620300ea
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ *  :: Experimental ::
+ * Choice of how to schedule consumers for a given TopicPartition on an executor.
+ * Kafka 0.10 consumers prefetch messages, so it's important for performance
+ * to keep cached consumers on appropriate executors, not recreate them for every partition.
+ * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere.
+ */
+@Experimental
+sealed trait LocationStrategy
+
+/**
+ *  :: Experimental ::
+ * Use this only if your executors are on the same nodes as your Kafka brokers.
+ */
+@Experimental
+case object PreferBrokers extends LocationStrategy {
+  def create: PreferBrokers.type = this
+}
+
+/**
+ *  :: Experimental ::
+ * Use this in most cases, it will consistently distribute partitions across all executors.
+ */
+@Experimental
+case object PreferConsistent extends LocationStrategy {
+  def create: PreferConsistent.type = this
+}
+
+/**
+ *  :: Experimental ::
+ * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
+ * Any TopicPartition not specified in the map will use a consistent location.
+ */
+@Experimental
+case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy
+
+/**
+ *  :: Experimental ::
+ * Use this to place particular TopicPartitions on particular hosts if your load is uneven.
+ * Any TopicPartition not specified in the map will use a consistent location.
+ */
+@Experimental
+object PreferFixed {
+  def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = {
+    PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
+  }
+  def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed =
+    PreferFixed(hostMap)
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
new file mode 100644
index 0000000000..c66d3c9b8d
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.clients.consumer.OffsetCommitCallback
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream]]).
+ * {{{
+ *   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ *      ...
+ *   }
+ * }}}
+ */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/**
+ *  :: Experimental ::
+ * Represents any object that can commit a collection of [[OffsetRange]]s.
+ * The direct Kafka DStream implements this interface (see
+ * [[KafkaUtils.createDirectStream]]).
+ * {{{
+ *   val stream = KafkaUtils.createDirectStream(...)
+ *     ...
+ *   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets, new OffsetCommitCallback() {
+ *     def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
+ *        if (null != e) {
+ *           // error
+ *        } else {
+ *         // success
+ *       }
+ *     }
+ *   })
+ * }}}
+ */
+@Experimental
+trait CanCommitOffsets {
+  /**
+   *  :: Experimental ::
+   * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
+   * This is only needed if you intend to store offsets in Kafka, instead of your own store.
+   * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
+   */
+  @Experimental
+  def commitAsync(offsetRanges: Array[OffsetRange]): Unit
+
+  /**
+   *  :: Experimental ::
+   * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
+   * This is only needed if you intend to store offsets in Kafka, instead of your own store.
+   * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
+   * @param callback Only the most recently provided callback will be used at commit.
+   */
+  @Experimental
+  def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
+}
+
+/**
+ * Represents a range of offsets from a single Kafka TopicPartition. Instances of this class
+ * can be created with `OffsetRange.create()`.
+ * @param topic Kafka topic name
+ * @param partition Kafka partition id
+ * @param fromOffset Inclusive starting offset
+ * @param untilOffset Exclusive ending offset
+ */
+final class OffsetRange private(
+    val topic: String,
+    val partition: Int,
+    val fromOffset: Long,
+    val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** Kafka TopicPartition object, for convenience */
+  def topicPartition(): TopicPartition = new TopicPartition(topic, partition)
+
+  /** Number of messages this OffsetRange refers to */
+  def count(): Long = untilOffset - fromOffset
+
+  override def equals(obj: Any): Boolean = obj match {
+    case that: OffsetRange =>
+      this.topic == that.topic &&
+        this.partition == that.partition &&
+        this.fromOffset == that.fromOffset &&
+        this.untilOffset == that.untilOffset
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    toTuple.hashCode()
+  }
+
+  override def toString(): String = {
+    s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])"
+  }
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[streaming]
+  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
+}
+
+/**
+ * Companion object the provides methods to create instances of [[OffsetRange]].
+ */
+object OffsetRange {
+  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+    new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+  def create(
+      topicPartition: TopicPartition,
+      fromOffset: Long,
+      untilOffset: Long): OffsetRange =
+    new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset)
+
+  def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+    new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+  def apply(
+      topicPartition: TopicPartition,
+      fromOffset: Long,
+      untilOffset: Long): OffsetRange =
+    new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset)
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[kafka010]
+  type OffsetRangeTuple = (String, Int, Long, Long)
+
+  private[kafka010]
+  def apply(t: OffsetRangeTuple) =
+    new OffsetRange(t._1, t._2, t._3, t._4)
+}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java
new file mode 100644
index 0000000000..ebfcf8764a
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Spark Integration for Kafka 0.10
+ */
+package org.apache.spark.streaming.kafka010;
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
new file mode 100644
index 0000000000..2bfc1e84d7
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Spark Integration for Kafka 0.10
+ */
+package object kafka
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
new file mode 100644
index 0000000000..aba45f5de6
--- /dev/null
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaConsumerStrategySuite implements Serializable {
+
+  @Test
+  public void testConsumerStrategyConstructors() {
+    final String topic1 = "topic1";
+    final Collection<String> topics = Arrays.asList(topic1);
+    final scala.collection.Iterable<String> sTopics =
+      JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
+    final TopicPartition tp1 = new TopicPartition(topic1, 0);
+    final TopicPartition tp2 = new TopicPartition(topic1, 1);
+    final Collection<TopicPartition> parts = Arrays.asList(tp1, tp2);
+    final scala.collection.Iterable<TopicPartition> sParts =
+      JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
+    final Map<String, Object> kafkaParams = new HashMap<String, Object>();
+    kafkaParams.put("bootstrap.servers", "not used");
+    final scala.collection.Map<String, Object> sKafkaParams =
+      JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
+    final Map<TopicPartition, Object> offsets = new HashMap<>();
+    offsets.put(tp1, 23L);
+    final scala.collection.Map<TopicPartition, Object> sOffsets =
+      JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+
+    // make sure constructors can be called from java
+    final ConsumerStrategy<String, String> sub0 =
+      Subscribe.<String, String>apply(topics, kafkaParams, offsets);
+    final ConsumerStrategy<String, String> sub1 =
+      Subscribe.<String, String>apply(sTopics, sKafkaParams, sOffsets);
+    final ConsumerStrategy<String, String> sub2 =
+      Subscribe.<String, String>apply(sTopics, sKafkaParams);
+    final ConsumerStrategy<String, String> sub3 =
+      Subscribe.<String, String>create(topics, kafkaParams, offsets);
+    final ConsumerStrategy<String, String> sub4 =
+      Subscribe.<String, String>create(topics, kafkaParams);
+
+    Assert.assertEquals(
+      sub1.executorKafkaParams().get("bootstrap.servers"),
+      sub3.executorKafkaParams().get("bootstrap.servers"));
+
+    final ConsumerStrategy<String, String> asn0 =
+      Assign.<String, String>apply(parts, kafkaParams, offsets);
+    final ConsumerStrategy<String, String> asn1 =
+      Assign.<String, String>apply(sParts, sKafkaParams, sOffsets);
+    final ConsumerStrategy<String, String> asn2 =
+      Assign.<String, String>apply(sParts, sKafkaParams);
+    final ConsumerStrategy<String, String> asn3 =
+      Assign.<String, String>create(parts, kafkaParams, offsets);
+    final ConsumerStrategy<String, String> asn4 =
+      Assign.<String, String>create(parts, kafkaParams);
+
+    Assert.assertEquals(
+      asn1.executorKafkaParams().get("bootstrap.servers"),
+      asn3.executorKafkaParams().get("bootstrap.servers"));
+  }
+
+}
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 0000000000..e57ede7afa
--- /dev/null
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+public class JavaDirectKafkaStreamSuite implements Serializable {
+  private transient JavaStreamingContext ssc = null;
+  private transient KafkaTestUtils kafkaTestUtils = null;
+
+  @Before
+  public void setUp() {
+    kafkaTestUtils = new KafkaTestUtils();
+    kafkaTestUtils.setup();
+    SparkConf sparkConf = new SparkConf()
+      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+    ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+  }
+
+  @After
+  public void tearDown() {
+    if (ssc != null) {
+      ssc.stop();
+      ssc = null;
+    }
+
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown();
+      kafkaTestUtils = null;
+    }
+  }
+
+  @Test
+  public void testKafkaStream() throws InterruptedException {
+    final String topic1 = "topic1";
+    final String topic2 = "topic2";
+    // hold a reference to the current offset ranges, so it can be used downstream
+    final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
+
+    String[] topic1data = createTopicAndSendData(topic1);
+    String[] topic2data = createTopicAndSendData(topic2);
+
+    Set<String> sent = new HashSet<>();
+    sent.addAll(Arrays.asList(topic1data));
+    sent.addAll(Arrays.asList(topic2data));
+
+    Random random = new Random();
+
+    final Map<String, Object> kafkaParams = new HashMap<>();
+    kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
+    kafkaParams.put("key.deserializer", StringDeserializer.class);
+    kafkaParams.put("value.deserializer", StringDeserializer.class);
+    kafkaParams.put("auto.offset.reset", "earliest");
+    kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() +
+      "-" + System.currentTimeMillis());
+
+    JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
+        ssc,
+        PreferConsistent.create(),
+        Subscribe.<String, String>create(Arrays.asList(topic1), kafkaParams)
+    );
+
+    JavaDStream<String> stream1 = istream1.transform(
+      // Make sure you can get offset ranges from the rdd
+      new Function<JavaRDD<ConsumerRecord<String, String>>,
+        JavaRDD<ConsumerRecord<String, String>>>() {
+          @Override
+          public JavaRDD<ConsumerRecord<String, String>> call(
+            JavaRDD<ConsumerRecord<String, String>> rdd
+          ) {
+            OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+            offsetRanges.set(offsets);
+            Assert.assertEquals(topic1, offsets[0].topic());
+            return rdd;
+          }
+        }
+    ).map(
+        new Function<ConsumerRecord<String, String>, String>() {
+          @Override
+          public String call(ConsumerRecord<String, String> r) {
+            return r.value();
+          }
+        }
+    );
+
+    final Map<String, Object> kafkaParams2 = new HashMap<>(kafkaParams);
+    kafkaParams2.put("group.id", "java-test-consumer-" + random.nextInt() +
+      "-" + System.currentTimeMillis());
+
+    JavaInputDStream<ConsumerRecord<String, String>> istream2 = KafkaUtils.createDirectStream(
+        ssc,
+        PreferConsistent.create(),
+        Subscribe.<String, String>create(Arrays.asList(topic2), kafkaParams2)
+    );
+
+    JavaDStream<String> stream2 = istream2.transform(
+      // Make sure you can get offset ranges from the rdd
+      new Function<JavaRDD<ConsumerRecord<String, String>>,
+        JavaRDD<ConsumerRecord<String, String>>>() {
+          @Override
+          public JavaRDD<ConsumerRecord<String, String>> call(
+            JavaRDD<ConsumerRecord<String, String>> rdd
+          ) {
+            OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+            offsetRanges.set(offsets);
+            Assert.assertEquals(topic2, offsets[0].topic());
+            return rdd;
+          }
+        }
+    ).map(
+        new Function<ConsumerRecord<String, String>, String>() {
+          @Override
+          public String call(ConsumerRecord<String, String> r) {
+            return r.value();
+          }
+        }
+    );
+
+    JavaDStream<String> unifiedStream = stream1.union(stream2);
+
+    final Set<String> result = Collections.synchronizedSet(new HashSet<String>());
+    unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+          @Override
+          public void call(JavaRDD<String> rdd) {
+            result.addAll(rdd.collect());
+          }
+        }
+    );
+    ssc.start();
+    long startTime = System.currentTimeMillis();
+    boolean matches = false;
+    while (!matches && System.currentTimeMillis() - startTime < 20000) {
+      matches = sent.size() == result.size();
+      Thread.sleep(50);
+    }
+    Assert.assertEquals(sent, result);
+    ssc.stop();
+  }
+
+  private  String[] createTopicAndSendData(String topic) {
+    String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+    kafkaTestUtils.createTopic(topic);
+    kafkaTestUtils.sendMessages(topic, data);
+    return data;
+  }
+}
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
new file mode 100644
index 0000000000..548ba134dc
--- /dev/null
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+public class JavaKafkaRDDSuite implements Serializable {
+  private transient JavaSparkContext sc = null;
+  private transient KafkaTestUtils kafkaTestUtils = null;
+
+  @Before
+  public void setUp() {
+    kafkaTestUtils = new KafkaTestUtils();
+    kafkaTestUtils.setup();
+    SparkConf sparkConf = new SparkConf()
+      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+    sc = new JavaSparkContext(sparkConf);
+  }
+
+  @After
+  public void tearDown() {
+    if (sc != null) {
+      sc.stop();
+      sc = null;
+    }
+
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown();
+      kafkaTestUtils = null;
+    }
+  }
+
+  @Test
+  public void testKafkaRDD() throws InterruptedException {
+    String topic1 = "topic1";
+    String topic2 = "topic2";
+
+    createTopicAndSendData(topic1);
+    createTopicAndSendData(topic2);
+
+    Map<String, Object> kafkaParams = new HashMap<>();
+    kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
+    kafkaParams.put("key.deserializer", StringDeserializer.class);
+    kafkaParams.put("value.deserializer", StringDeserializer.class);
+
+    OffsetRange[] offsetRanges = {
+      OffsetRange.create(topic1, 0, 0, 1),
+      OffsetRange.create(topic2, 0, 0, 1)
+    };
+
+    Map<TopicPartition, String> leaders = new HashMap<>();
+    String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
+    String broker = hostAndPort[0];
+    leaders.put(offsetRanges[0].topicPartition(), broker);
+    leaders.put(offsetRanges[1].topicPartition(), broker);
+
+    Function<ConsumerRecord<String, String>, String> handler =
+      new Function<ConsumerRecord<String, String>, String>() {
+        @Override
+        public String call(ConsumerRecord<String, String> r) {
+          return r.value();
+        }
+      };
+
+    JavaRDD<String> rdd1 = KafkaUtils.<String, String>createRDD(
+        sc,
+        kafkaParams,
+        offsetRanges,
+        PreferFixed.create(leaders)
+    ).map(handler);
+
+    JavaRDD<String> rdd2 = KafkaUtils.<String, String>createRDD(
+        sc,
+        kafkaParams,
+        offsetRanges,
+        PreferConsistent.create()
+    ).map(handler);
+
+    // just making sure the java user apis work; the scala tests handle logic corner cases
+    long count1 = rdd1.count();
+    long count2 = rdd2.count();
+    Assert.assertTrue(count1 > 0);
+    Assert.assertEquals(count1, count2);
+  }
+
+  private  String[] createTopicAndSendData(String topic) {
+    String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+    kafkaTestUtils.createTopic(topic);
+    kafkaTestUtils.sendMessages(topic, data);
+    return data;
+  }
+}
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
new file mode 100644
index 0000000000..7873c09e1a
--- /dev/null
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaLocationStrategySuite implements Serializable {
+
+  @Test
+  public void testLocationStrategyConstructors() {
+    final String topic1 = "topic1";
+    final TopicPartition tp1 = new TopicPartition(topic1, 0);
+    final TopicPartition tp2 = new TopicPartition(topic1, 1);
+    final Map<TopicPartition, String> hosts = new HashMap<>();
+    hosts.put(tp1, "node1");
+    hosts.put(tp2, "node2");
+    final scala.collection.Map<TopicPartition, String> sHosts =
+      JavaConverters.mapAsScalaMapConverter(hosts).asScala();
+
+    // make sure constructors can be called from java
+    final LocationStrategy c1 = PreferConsistent.create();
+    final LocationStrategy c2 = PreferConsistent$.MODULE$;
+    Assert.assertEquals(c1, c2);
+
+    final LocationStrategy c3 = PreferBrokers.create();
+    final LocationStrategy c4 = PreferBrokers$.MODULE$;
+    Assert.assertEquals(c3, c4);
+
+    final LocationStrategy c5 = PreferFixed.create(hosts);
+    final LocationStrategy c6 = PreferFixed.apply(sHosts);
+    Assert.assertEquals(c5, c6);
+
+  }
+
+}
diff --git a/external/kafka-0-10/src/test/resources/log4j.properties b/external/kafka-0-10/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..75e3b53a09
--- /dev/null
+++ b/external/kafka-0-10/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
new file mode 100644
index 0000000000..776d11ad2f
--- /dev/null
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.io.File
+import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.util.Utils
+
+class DirectKafkaStreamSuite
+  extends SparkFunSuite
+  with BeforeAndAfter
+  with BeforeAndAfterAll
+  with Eventually
+  with Logging {
+  val sparkConf = new SparkConf()
+    .setMaster("local[4]")
+    .setAppName(this.getClass.getSimpleName)
+
+  private var sc: SparkContext = _
+  private var ssc: StreamingContext = _
+  private var testDir: File = _
+
+  private var kafkaTestUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+    kafkaTestUtils = new KafkaTestUtils
+    kafkaTestUtils.setup()
+  }
+
+  override def afterAll {
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown()
+      kafkaTestUtils = null
+    }
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop()
+      sc = null
+    }
+    if (sc != null) {
+      sc.stop()
+    }
+    if (testDir != null) {
+      Utils.deleteRecursively(testDir)
+    }
+  }
+
+  def getKafkaParams(extra: (String, Object)*): JHashMap[String, Object] = {
+    val kp = new JHashMap[String, Object]()
+    kp.put("bootstrap.servers", kafkaTestUtils.brokerAddress)
+    kp.put("key.deserializer", classOf[StringDeserializer])
+    kp.put("value.deserializer", classOf[StringDeserializer])
+    kp.put("group.id", s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
+    extra.foreach(e => kp.put(e._1, e._2))
+    kp
+  }
+
+  val preferredHosts = PreferConsistent
+
+  test("basic stream receiving with multiple topics and smallest starting offset") {
+    val topics = List("basic1", "basic2", "basic3")
+    val data = Map("a" -> 7, "b" -> 9)
+    topics.foreach { t =>
+      kafkaTestUtils.createTopic(t)
+      kafkaTestUtils.sendMessages(t, data)
+    }
+    val totalSent = data.values.sum * topics.size
+    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val stream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String](
+        ssc, preferredHosts, Subscribe[String, String](topics, kafkaParams.asScala))
+    }
+    val allReceived = new ConcurrentLinkedQueue[(String, String)]()
+
+    // hold a reference to the current offset ranges, so it can be used downstream
+    var offsetRanges = Array[OffsetRange]()
+    val tf = stream.transform { rdd =>
+      // Get the offset ranges in the RDD
+      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+      rdd.map(r => (r.key, r.value))
+    }
+
+    tf.foreachRDD { rdd =>
+      for (o <- offsetRanges) {
+        logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+      }
+      val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+      // For each partition, get size of the range in the partition,
+      // and the number of items in the partition
+        val off = offsetRanges(i)
+        val all = iter.toSeq
+        val partSize = all.size
+        val rangeSize = off.untilOffset - off.fromOffset
+        Iterator((partSize, rangeSize))
+      }.collect
+
+      // Verify whether number of elements in each partition
+      // matches with the corresponding offset range
+      collected.foreach { case (partSize, rangeSize) =>
+        assert(partSize === rangeSize, "offset ranges are wrong")
+      }
+    }
+
+    stream.foreachRDD { rdd =>
+      allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
+    }
+    ssc.start()
+    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+      assert(allReceived.size === totalSent,
+        "didn't get expected number of messages, messages:\n" +
+          allReceived.asScala.mkString("\n"))
+    }
+    ssc.stop()
+  }
+
+  test("receiving from largest starting offset") {
+    val topic = "latest"
+    val topicPartition = new TopicPartition(topic, 0)
+    val data = Map("a" -> 10)
+    kafkaTestUtils.createTopic(topic)
+    val kafkaParams = getKafkaParams("auto.offset.reset" -> "latest")
+    val kc = new KafkaConsumer(kafkaParams)
+    kc.assign(Arrays.asList(topicPartition))
+    def getLatestOffset(): Long = {
+      kc.seekToEnd(Arrays.asList(topicPartition))
+      kc.position(topicPartition)
+    }
+
+    // Send some initial messages before starting context
+    kafkaTestUtils.sendMessages(topic, data)
+    eventually(timeout(10 seconds), interval(20 milliseconds)) {
+      assert(getLatestOffset() > 3)
+    }
+    val offsetBeforeStart = getLatestOffset()
+    kc.close()
+
+    // Setup context and kafka stream with largest offset
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val stream = withClue("Error creating direct stream") {
+      val s = new DirectKafkaInputDStream[String, String](
+        ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+      s.consumer.poll(0)
+      assert(
+        s.consumer.position(topicPartition) >= offsetBeforeStart,
+        "Start offset not from latest"
+      )
+      s
+    }
+
+    val collectedData = new ConcurrentLinkedQueue[String]()
+    stream.map { _.value }.foreachRDD { rdd =>
+      collectedData.addAll(Arrays.asList(rdd.collect(): _*))
+    }
+    ssc.start()
+    val newData = Map("b" -> 10)
+    kafkaTestUtils.sendMessages(topic, newData)
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      collectedData.contains("b")
+    }
+    assert(!collectedData.contains("a"))
+  }
+
+
+  test("creating stream by offset") {
+    val topic = "offset"
+    val topicPartition = new TopicPartition(topic, 0)
+    val data = Map("a" -> 10)
+    kafkaTestUtils.createTopic(topic)
+    val kafkaParams = getKafkaParams("auto.offset.reset" -> "latest")
+    val kc = new KafkaConsumer(kafkaParams)
+    kc.assign(Arrays.asList(topicPartition))
+    def getLatestOffset(): Long = {
+      kc.seekToEnd(Arrays.asList(topicPartition))
+      kc.position(topicPartition)
+    }
+
+    // Send some initial messages before starting context
+    kafkaTestUtils.sendMessages(topic, data)
+    eventually(timeout(10 seconds), interval(20 milliseconds)) {
+      assert(getLatestOffset() >= 10)
+    }
+    val offsetBeforeStart = getLatestOffset()
+    kc.close()
+
+    // Setup context and kafka stream with largest offset
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val stream = withClue("Error creating direct stream") {
+      val s = new DirectKafkaInputDStream[String, String](ssc, preferredHosts,
+        Assign[String, String](
+          List(topicPartition),
+          kafkaParams.asScala,
+          Map(topicPartition -> 11L)))
+      s.consumer.poll(0)
+      assert(
+        s.consumer.position(topicPartition) >= offsetBeforeStart,
+        "Start offset not from latest"
+      )
+      s
+    }
+
+    val collectedData = new ConcurrentLinkedQueue[String]()
+    stream.map(_.value).foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
+    ssc.start()
+    val newData = Map("b" -> 10)
+    kafkaTestUtils.sendMessages(topic, newData)
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      collectedData.contains("b")
+    }
+    assert(!collectedData.contains("a"))
+  }
+
+  // Test to verify the offset ranges can be recovered from the checkpoints
+  test("offset recovery") {
+    val topic = "recovery"
+    kafkaTestUtils.createTopic(topic)
+    testDir = Utils.createTempDir()
+
+    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+
+    // Send data to Kafka
+    def sendData(data: Seq[Int]) {
+      val strings = data.map { _.toString}
+      kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
+    }
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, Milliseconds(100))
+    val kafkaStream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String](
+        ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+    }
+    val keyedStream = kafkaStream.map { r => "key" -> r.value.toInt }
+    val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
+      Some(values.sum + state.getOrElse(0))
+    }
+    ssc.checkpoint(testDir.getAbsolutePath)
+
+    // This is ensure all the data is eventually receiving only once
+    stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
+      rdd.collect().headOption.foreach { x =>
+        DirectKafkaStreamSuite.total.set(x._2)
+      }
+    }
+
+    ssc.start()
+
+    // Send some data
+    for (i <- (1 to 10).grouped(4)) {
+      sendData(i)
+    }
+
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
+    }
+
+    ssc.stop()
+
+    // Verify that offset ranges were generated
+    val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
+    assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+    assert(
+      offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+      "starting offset not zero"
+    )
+
+    logInfo("====== RESTARTING ========")
+
+    // Recover context from checkpoints
+    ssc = new StreamingContext(testDir.getAbsolutePath)
+    val recoveredStream =
+      ssc.graph.getInputStreams().head.asInstanceOf[DStream[ConsumerRecord[String, String]]]
+
+    // Verify offset ranges have been recovered
+    val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => (x._1, x._2.toSet) }
+    assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
+    val earlierOffsetRanges = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) }
+    assert(
+      recoveredOffsetRanges.forall { or =>
+        earlierOffsetRanges.contains((or._1, or._2))
+      },
+      "Recovered ranges are not the same as the ones generated\n" +
+        earlierOffsetRanges + "\n" + recoveredOffsetRanges
+    )
+    // Restart context, give more data and verify the total at the end
+    // If the total is write that means each records has been received only once
+    ssc.start()
+    for (i <- (11 to 20).grouped(4)) {
+      sendData(i)
+    }
+
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
+    }
+    ssc.stop()
+  }
+
+    // Test to verify the offsets can be recovered from Kafka
+  test("offset recovery from kafka") {
+    val topic = "recoveryfromkafka"
+    kafkaTestUtils.createTopic(topic)
+
+    val kafkaParams = getKafkaParams(
+      "auto.offset.reset" -> "earliest",
+      ("enable.auto.commit", false: java.lang.Boolean)
+    )
+
+    val collectedData = new ConcurrentLinkedQueue[String]()
+    val committed = new JHashMap[TopicPartition, OffsetAndMetadata]()
+
+    // Send data to Kafka and wait for it to be received
+    def sendDataAndWaitForReceive(data: Seq[Int]) {
+      val strings = data.map { _.toString}
+      kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        assert(strings.forall { collectedData.contains })
+      }
+    }
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, Milliseconds(100))
+    withClue("Error creating direct stream") {
+      val kafkaStream = KafkaUtils.createDirectStream[String, String](
+        ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+      kafkaStream.foreachRDD { (rdd: RDD[ConsumerRecord[String, String]], time: Time) =>
+        val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+        val data = rdd.map(_.value).collect()
+        collectedData.addAll(Arrays.asList(data: _*))
+        kafkaStream.asInstanceOf[CanCommitOffsets]
+          .commitAsync(offsets, new OffsetCommitCallback() {
+            def onComplete(m: JMap[TopicPartition, OffsetAndMetadata], e: Exception) {
+              if (null != e) {
+                logError("commit failed", e)
+              } else {
+                committed.putAll(m)
+              }
+            }
+          })
+      }
+    }
+    ssc.start()
+    // Send some data and wait for them to be received
+    for (i <- (1 to 10).grouped(4)) {
+      sendDataAndWaitForReceive(i)
+    }
+    ssc.stop()
+    assert(! committed.isEmpty)
+    val consumer = new KafkaConsumer[String, String](kafkaParams)
+    consumer.subscribe(Arrays.asList(topic))
+    consumer.poll(0)
+    committed.asScala.foreach {
+      case (k, v) =>
+        // commits are async, not exactly once
+        assert(v.offset > 0)
+        assert(consumer.position(k) >= v.offset)
+    }
+  }
+
+
+  test("Direct Kafka stream report input information") {
+    val topic = "report-test"
+    val data = Map("a" -> 7, "b" -> 9)
+    kafkaTestUtils.createTopic(topic)
+    kafkaTestUtils.sendMessages(topic, data)
+
+    val totalSent = data.values.sum
+    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+
+    import DirectKafkaStreamSuite._
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val collector = new InputInfoCollector
+    ssc.addStreamingListener(collector)
+
+    val stream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String](
+        ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
+    }
+
+    val allReceived = new ConcurrentLinkedQueue[(String, String)]
+
+    stream.map(r => (r.key, r.value))
+      .foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
+    ssc.start()
+    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+      assert(allReceived.size === totalSent,
+        "didn't get expected number of messages, messages:\n" +
+          allReceived.asScala.mkString("\n"))
+
+      // Calculate all the record number collected in the StreamingListener.
+      assert(collector.numRecordsSubmitted.get() === totalSent)
+      assert(collector.numRecordsStarted.get() === totalSent)
+      assert(collector.numRecordsCompleted.get() === totalSent)
+    }
+    ssc.stop()
+  }
+
+  test("maxMessagesPerPartition with backpressure disabled") {
+    val topic = "maxMessagesPerPartition"
+    val kafkaStream = getDirectKafkaStream(topic, None)
+
+    val input = Map(new TopicPartition(topic, 0) -> 50L, new TopicPartition(topic, 1) -> 50L)
+    assert(kafkaStream.maxMessagesPerPartition(input).get ==
+      Map(new TopicPartition(topic, 0) -> 10L, new TopicPartition(topic, 1) -> 10L))
+  }
+
+  test("maxMessagesPerPartition with no lag") {
+    val topic = "maxMessagesPerPartition"
+    val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 100))
+    val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+    val input = Map(new TopicPartition(topic, 0) -> 0L, new TopicPartition(topic, 1) -> 0L)
+    assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
+  }
+
+  test("maxMessagesPerPartition respects max rate") {
+    val topic = "maxMessagesPerPartition"
+    val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 1000))
+    val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+    val input = Map(new TopicPartition(topic, 0) -> 1000L, new TopicPartition(topic, 1) -> 1000L)
+    assert(kafkaStream.maxMessagesPerPartition(input).get ==
+      Map(new TopicPartition(topic, 0) -> 10L, new TopicPartition(topic, 1) -> 10L))
+  }
+
+  test("using rate controller") {
+    val topic = "backpressure"
+    val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+    kafkaTestUtils.createTopic(topic, 2)
+    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+    val executorKafkaParams = new JHashMap[String, Object](kafkaParams)
+    KafkaUtils.fixKafkaParams(executorKafkaParams)
+
+    val batchIntervalMilliseconds = 100
+    val estimator = new ConstantEstimator(100)
+    val messages = Map("foo" -> 200)
+    kafkaTestUtils.sendMessages(topic, messages)
+
+    val sparkConf = new SparkConf()
+      // Safe, even with streaming, because we're using the direct API.
+      // Using 1 core is useful to make the test more predictable.
+      .setMaster("local[1]")
+      .setAppName(this.getClass.getSimpleName)
+      .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+    val kafkaStream = withClue("Error creating direct stream") {
+      new DirectKafkaInputDStream[String, String](
+        ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) {
+        override protected[streaming] val rateController =
+          Some(new DirectKafkaRateController(id, estimator))
+      }.map(r => (r.key, r.value))
+    }
+
+    val collectedData = new ConcurrentLinkedQueue[Array[String]]()
+
+    // Used for assertion failure messages.
+    def dataToString: String =
+      collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
+
+    // This is to collect the raw data received from Kafka
+    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+      val data = rdd.map { _._2 }.collect()
+      collectedData.add(data)
+    }
+
+    ssc.start()
+
+    // Try different rate limits.
+    // Wait for arrays of data to appear matching the rate.
+    Seq(100, 50, 20).foreach { rate =>
+      collectedData.clear()       // Empty this buffer on each pass.
+      estimator.updateRate(rate)  // Set a new rate.
+      // Expect blocks of data equal to "rate", scaled by the interval length in secs.
+      val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
+      eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
+        // Assert that rate estimator values are used to determine maxMessagesPerPartition.
+        // Funky "-" in message makes the complete assertion message read better.
+        assert(collectedData.asScala.exists(_.size == expectedSize),
+          s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
+      }
+    }
+
+    ssc.stop()
+  }
+
+  /** Get the generated offset ranges from the DirectKafkaStream */
+  private def getOffsetRanges[K, V](
+      kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, Array[OffsetRange])] = {
+    kafkaStream.generatedRDDs.mapValues { rdd =>
+      rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+    }.toSeq.sortBy { _._1 }
+  }
+
+  private def getDirectKafkaStream(topic: String, mockRateController: Option[RateController]) = {
+    val batchIntervalMilliseconds = 100
+
+    val sparkConf = new SparkConf()
+      .setMaster("local[1]")
+      .setAppName(this.getClass.getSimpleName)
+      .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+    val ekp = new JHashMap[String, Object](kafkaParams)
+    KafkaUtils.fixKafkaParams(ekp)
+
+    val s = new DirectKafkaInputDStream[String, String](
+      ssc,
+      preferredHosts,
+      new ConsumerStrategy[String, String] {
+        def executorKafkaParams = ekp
+        def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[String, String] = {
+          val consumer = new KafkaConsumer[String, String](kafkaParams)
+          val tps = List(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+          consumer.assign(Arrays.asList(tps: _*))
+          tps.foreach(tp => consumer.seek(tp, 0))
+          consumer
+        }
+      }
+    ) {
+        override protected[streaming] val rateController = mockRateController
+    }
+    // manual start necessary because we arent consuming the stream, just checking its state
+    s.start()
+    s
+  }
+}
+
+object DirectKafkaStreamSuite {
+  val total = new AtomicLong(-1L)
+
+  class InputInfoCollector extends StreamingListener {
+    val numRecordsSubmitted = new AtomicLong(0L)
+    val numRecordsStarted = new AtomicLong(0L)
+    val numRecordsCompleted = new AtomicLong(0L)
+
+    override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
+      numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords)
+    }
+
+    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
+      numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords)
+    }
+
+    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
+      numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords)
+    }
+  }
+}
+
+private[streaming] class ConstantEstimator(@volatile private var rate: Long)
+  extends RateEstimator {
+
+  def updateRate(newRate: Long): Unit = {
+    rate = newRate
+  }
+
+  def compute(
+      time: Long,
+      elements: Long,
+      processingDelay: Long,
+      schedulingDelay: Long): Option[Double] = Some(rate)
+}
+
+private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long)
+  extends RateController(id, estimator) {
+  override def publish(rate: Long): Unit = ()
+  override def getLatestRate(): Long = rate
+}
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
new file mode 100644
index 0000000000..3d2546ddd9
--- /dev/null
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+
+class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var kafkaTestUtils: KafkaTestUtils = _
+
+  private val sparkConf = new SparkConf().setMaster("local[4]")
+    .setAppName(this.getClass.getSimpleName)
+  private var sc: SparkContext = _
+
+  override def beforeAll {
+    sc = new SparkContext(sparkConf)
+    kafkaTestUtils = new KafkaTestUtils
+    kafkaTestUtils.setup()
+  }
+
+  override def afterAll {
+    if (sc != null) {
+      sc.stop
+      sc = null
+    }
+
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown()
+      kafkaTestUtils = null
+    }
+  }
+
+  private def getKafkaParams() = Map[String, Object](
+    "bootstrap.servers" -> kafkaTestUtils.brokerAddress,
+    "key.deserializer" -> classOf[StringDeserializer],
+    "value.deserializer" -> classOf[StringDeserializer],
+    "group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}"
+  ).asJava
+
+  private val preferredHosts = PreferConsistent
+
+  test("basic usage") {
+    val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"
+    kafkaTestUtils.createTopic(topic)
+    val messages = Array("the", "quick", "brown", "fox")
+    kafkaTestUtils.sendMessages(topic, messages)
+
+    val kafkaParams = getKafkaParams()
+
+    val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
+
+    val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, offsetRanges, preferredHosts)
+      .map(_.value)
+
+    val received = rdd.collect.toSet
+    assert(received === messages.toSet)
+
+    // size-related method optimizations return sane results
+    assert(rdd.count === messages.size)
+    assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+    assert(!rdd.isEmpty)
+    assert(rdd.take(1).size === 1)
+    assert(rdd.take(1).head === messages.head)
+    assert(rdd.take(messages.size + 10).size === messages.size)
+
+    val emptyRdd = KafkaUtils.createRDD[String, String](
+      sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts)
+
+    assert(emptyRdd.isEmpty)
+
+    // invalid offset ranges throw exceptions
+    val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
+    intercept[SparkException] {
+      val result = KafkaUtils.createRDD[String, String](sc, kafkaParams, badRanges, preferredHosts)
+        .map(_.value)
+        .collect()
+    }
+  }
+
+  test("iterator boundary conditions") {
+    // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
+    val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}"
+    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+    kafkaTestUtils.createTopic(topic)
+
+    val kafkaParams = getKafkaParams()
+
+    // this is the "lots of messages" case
+    kafkaTestUtils.sendMessages(topic, sent)
+    var sentCount = sent.values.sum
+
+    val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams,
+      Array(OffsetRange(topic, 0, 0, sentCount)), preferredHosts)
+
+    val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+    val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
+
+    assert(rangeCount === sentCount, "offset range didn't include all sent messages")
+    assert(rdd.map(_.offset).collect.sorted === (0 until sentCount).toArray,
+      "didn't get all sent messages")
+
+    // this is the "0 messages" case
+    val rdd2 = KafkaUtils.createRDD[String, String](sc, kafkaParams,
+      Array(OffsetRange(topic, 0, sentCount, sentCount)), preferredHosts)
+
+    // shouldn't get anything, since message is sent after rdd was defined
+    val sentOnlyOne = Map("d" -> 1)
+
+    kafkaTestUtils.sendMessages(topic, sentOnlyOne)
+
+    assert(rdd2.map(_.value).collect.size === 0, "got messages when there shouldn't be any")
+
+    // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
+    val rdd3 = KafkaUtils.createRDD[String, String](sc, kafkaParams,
+      Array(OffsetRange(topic, 0, sentCount, sentCount + 1)), preferredHosts)
+
+    // send lots of messages after rdd was defined, they shouldn't show up
+    kafkaTestUtils.sendMessages(topic, Map("extra" -> 22))
+
+    assert(rdd3.map(_.value).collect.head === sentOnlyOne.keys.head,
+      "didn't get exactly one message")
+  }
+
+  test("executor sorting") {
+    val kafkaParams = new ju.HashMap[String, Object](getKafkaParams())
+    kafkaParams.put("auto.offset.reset", "none")
+    val rdd = new KafkaRDD[String, String](
+      sc,
+      kafkaParams,
+      Array(OffsetRange("unused", 0, 1, 2)),
+      ju.Collections.emptyMap[TopicPartition, String](),
+      true)
+    val a3 = ExecutorCacheTaskLocation("a", "3")
+    val a4 = ExecutorCacheTaskLocation("a", "4")
+    val b1 = ExecutorCacheTaskLocation("b", "1")
+    val b2 = ExecutorCacheTaskLocation("b", "2")
+
+    val correct = Array(b2, b1, a4, a3)
+
+    correct.permutations.foreach { p =>
+      assert(p.sortWith(rdd.compareExecutors) === correct)
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 89ed87ff9e..c99d786b14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,8 @@
     <module>launcher</module>
     <module>external/kafka-0-8</module>
     <module>external/kafka-0-8-assembly</module>
+    <module>external/kafka-0-10</module>
+    <module>external/kafka-0-10-assembly</module>
   </modules>
 
   <properties>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 4c01ad3c33..8e3dcc2f38 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -44,9 +44,9 @@ object BuildCommons {
   ).map(ProjectRef(buildLocation, _))
 
   val streamingProjects@Seq(
-    streaming, streamingFlumeSink, streamingFlume, streamingKafka
+    streaming, streamingFlumeSink, streamingFlume, streamingKafka, streamingKafka010
   ) = Seq(
-    "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-8"
+    "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-8", "streaming-kafka-0-10"
   ).map(ProjectRef(buildLocation, _))
 
   val allProjects@Seq(
@@ -61,8 +61,8 @@ object BuildCommons {
     Seq("yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl",
       "docker-integration-tests").map(ProjectRef(buildLocation, _))
 
-  val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) =
-    Seq("network-yarn", "streaming-flume-assembly", "streaming-kafka-0-8-assembly", "streaming-kinesis-asl-assembly")
+  val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) =
+    Seq("network-yarn", "streaming-flume-assembly", "streaming-kafka-0-8-assembly", "streaming-kafka-0-10-assembly", "streaming-kinesis-asl-assembly")
       .map(ProjectRef(buildLocation, _))
 
   val copyJarsProjects@Seq(assembly, examples) = Seq("assembly", "examples")
@@ -352,7 +352,7 @@ object SparkBuild extends PomBuild {
   val mimaProjects = allProjects.filterNot { x =>
     Seq(
       spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn,
-      unsafe, tags, sketch, mllibLocal
+      unsafe, tags, sketch, mllibLocal, streamingKafka010
     ).contains(x)
   }
 
@@ -608,7 +608,7 @@ object Assembly {
         .getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
     },
     jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
-      if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-0-8-assembly") || mName.contains("streaming-kinesis-asl-assembly")) {
+      if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-0-8-assembly") || mName.contains("streaming-kafka-0-10-assembly") || mName.contains("streaming-kinesis-asl-assembly")) {
         // This must match the same name used in maven (see external/kafka-0-8-assembly/pom.xml)
         s"${mName}-${v}.jar"
       } else {
-- 
GitLab