From 7d06bdde1d1364dcbef67079b23f6e9777a2de2e Mon Sep 17 00:00:00 2001
From: Harvey Feng <harvey@databricks.com>
Date: Sat, 28 Sep 2013 18:32:41 -0700
Subject: [PATCH] Merge HadoopDatasetRDD into HadoopRDD.

---
 .../scala/org/apache/spark/SparkContext.scala |  9 ++-
 .../org/apache/spark/rdd/HadoopRDD.scala      | 58 ++++++++++++-------
 2 files changed, 40 insertions(+), 27 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 11e92945ec..ada1037bd6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -332,17 +332,15 @@ class SparkContext(
    * etc).
    */
   def hadoopRDD[K, V](
-      conf: JobConf,
+      jobConf: JobConf,
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
     // Add necessary security credentials to the JobConf before broadcasting it.
-    SparkEnv.get.hadoop.addCredentials(conf)
-    // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
-    val confBroadcast = broadcast(new SerializableWritable(conf))
-    new HadoopDatasetRDD(this, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+    SparkEnv.get.hadoop.addCredentials(jobConf)
+    new HadoopRDD(this, jobConf, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
   /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
@@ -353,6 +351,7 @@ class SparkContext(
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
+    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
     val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
     hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
   }
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index dd9fc7b79a..404532dad4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -43,20 +43,18 @@ import org.apache.hadoop.conf.{Configuration, Configurable}
 class HadoopFileRDD[K, V](
     sc: SparkContext,
     path: String,
-    hadoopConfBroadcast: Broadcast[SerializableWritable[Configuration]],
+    confBroadcast: Broadcast[SerializableWritable[Configuration]],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
     minSplits: Int)
-  extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
-
-  private val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+  extends HadoopRDD[K, V](sc, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) {
 
   override def getJobConf(): JobConf = {
     if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
       return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
     } else {
-      val newJobConf = new JobConf(hadoopConfBroadcast.value.value)
+      val newJobConf = new JobConf(confBroadcast.value.value)
       FileInputFormat.setInputPaths(newJobConf, path)
       HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
       return newJobConf
@@ -64,21 +62,6 @@ class HadoopFileRDD[K, V](
   }
 }
 
-/**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. tables in HBase).
- */
-class HadoopDatasetRDD[K, V](
-    sc: SparkContext,
-    confBroadcast: Broadcast[SerializableWritable[JobConf]],
-    inputFormatClass: Class[_ <: InputFormat[K, V]],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int)
-  extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
-
-  override def getJobConf(): JobConf = confBroadcast.value.value
-}
-
 /**
  * A Spark split class that wraps around a Hadoop InputSplit.
  */
@@ -95,18 +78,49 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
 /**
  * A base class that provides core functionality for reading data partitions stored in Hadoop.
  */
-abstract class HadoopRDD[K, V](
+class HadoopRDD[K, V](
     sc: SparkContext,
+    confBroadcast: Broadcast[SerializableWritable[Configuration]],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
     minSplits: Int)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
+  def this(
+      sc: SparkContext,
+      jobConf: JobConf,
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      minSplits: Int) = {
+    this(
+      sc,
+      sc.broadcast(new SerializableWritable(jobConf))
+        .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+      inputFormatClass,
+      keyClass,
+      valueClass,
+      minSplits)
+  }
+
+  protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+
   private val inputFormatCacheKey = "rdd_%d_input_format".format(id)
 
   // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
-  protected def getJobConf(): JobConf
+  protected def getJobConf(): JobConf = {
+    val conf: Configuration = confBroadcast.value.value
+    if (conf.isInstanceOf[JobConf]) {
+      return conf.asInstanceOf[JobConf]
+    } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+    } else {
+      val newJobConf = new JobConf(confBroadcast.value.value)
+      HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+      return newJobConf
+    }
+  }
 
   def getInputFormat(conf: JobConf): InputFormat[K, V] = {
     if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
-- 
GitLab