diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index fb58ed789887f7958a64a38c7bfe859c6ad0cfcf..c3c799375bbeb3ae6822e78dab4fe8f87122dc0c 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -34,7 +34,7 @@ import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
 import org.apache.spark.streaming.scheduler.rate.RateEstimator
 
 /**
- *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ *  A stream of [[KafkaRDD]] where
  * each given Kafka topic/partition corresponds to an RDD partition.
  * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
  *  of messages
@@ -43,7 +43,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
  * and this DStream is not responsible for committing offsets,
  * so that you can control exactly-once semantics.
  * For an easy interface to Kafka-managed offsets,
- *  see {@link org.apache.spark.streaming.kafka.KafkaCluster}
+ *  see [[KafkaCluster]]
  * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
  * configuration parameters</a>.
  *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
@@ -132,7 +132,7 @@ class DirectKafkaInputDStream[
       if (retries <= 0) {
         throw new SparkException(err)
       } else {
-        log.error(err)
+        logError(err)
         Thread.sleep(kc.config.refreshLeaderBackoffMs)
         latestLeaderOffsets(retries - 1)
       }
@@ -194,7 +194,7 @@ class DirectKafkaInputDStream[
       data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
     }
 
-    override def update(time: Time) {
+    override def update(time: Time): Unit = {
       batchForTime.clear()
       generatedRDDs.foreach { kv =>
         val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
@@ -202,9 +202,9 @@ class DirectKafkaInputDStream[
       }
     }
 
-    override def cleanup(time: Time) { }
+    override def cleanup(time: Time): Unit = { }
 
-    override def restore() {
+    override def restore(): Unit = {
       // this is assuming that the topics don't change during execution, which is true currently
       val topics = fromOffsets.keySet
       val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index d4881b140df3c6c48f8242dadfb1f23041cebe2b..2b925774a2f7f26d2785b9f725b972ce30f7fb8a 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -129,7 +129,7 @@ class KafkaRDD[
     val part = thePart.asInstanceOf[KafkaRDDPartition]
     assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
     if (part.fromOffset == part.untilOffset) {
-      log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
+      logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
         s"skipping ${part.topic} ${part.partition}")
       Iterator.empty
     } else {
@@ -137,13 +137,16 @@ class KafkaRDD[
     }
   }
 
+  /**
+   * An iterator that fetches messages directly from Kafka for the offsets in partition.
+   */
   private class KafkaRDDIterator(
       part: KafkaRDDPartition,
       context: TaskContext) extends NextIterator[R] {
 
     context.addTaskCompletionListener{ context => closeIfNeeded() }
 
-    log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
+    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
 
     val kc = new KafkaCluster(kafkaParams)
@@ -177,7 +180,7 @@ class KafkaRDD[
         val err = resp.errorCode(part.topic, part.partition)
         if (err == ErrorMapping.LeaderNotAvailableCode ||
           err == ErrorMapping.NotLeaderForPartitionCode) {
-          log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
+          logError(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
             s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
           Thread.sleep(kc.config.refreshLeaderBackoffMs)
         }
diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index fa6b0dbc8c2197b4087dd717246966a366913571..71404a7331ec4602c0eed9a761fa226030f9dd11 100644
--- a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -135,11 +135,6 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
           @Override
           public void call(JavaRDD<String> rdd) {
             result.addAll(rdd.collect());
-            for (OffsetRange o : offsetRanges.get()) {
-              System.out.println(
-                o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
-              );
-            }
           }
         }
     );