Skip to content
Snippets Groups Projects
Commit 7d06bdde authored by Harvey Feng's avatar Harvey Feng
Browse files

Merge HadoopDatasetRDD into HadoopRDD.

parent 41708571
No related branches found
No related tags found
No related merge requests found
...@@ -332,17 +332,15 @@ class SparkContext( ...@@ -332,17 +332,15 @@ class SparkContext(
* etc). * etc).
*/ */
def hadoopRDD[K, V]( def hadoopRDD[K, V](
conf: JobConf, jobConf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]], inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], keyClass: Class[K],
valueClass: Class[V], valueClass: Class[V],
minSplits: Int = defaultMinSplits minSplits: Int = defaultMinSplits
): RDD[(K, V)] = { ): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it. // Add necessary security credentials to the JobConf before broadcasting it.
SparkEnv.get.hadoop.addCredentials(conf) SparkEnv.get.hadoop.addCredentials(jobConf)
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it. new HadoopRDD(this, jobConf, inputFormatClass, keyClass, valueClass, minSplits)
val confBroadcast = broadcast(new SerializableWritable(conf))
new HadoopDatasetRDD(this, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
} }
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */ /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
...@@ -353,6 +351,7 @@ class SparkContext( ...@@ -353,6 +351,7 @@ class SparkContext(
valueClass: Class[V], valueClass: Class[V],
minSplits: Int = defaultMinSplits minSplits: Int = defaultMinSplits
): RDD[(K, V)] = { ): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
} }
......
...@@ -43,20 +43,18 @@ import org.apache.hadoop.conf.{Configuration, Configurable} ...@@ -43,20 +43,18 @@ import org.apache.hadoop.conf.{Configuration, Configurable}
class HadoopFileRDD[K, V]( class HadoopFileRDD[K, V](
sc: SparkContext, sc: SparkContext,
path: String, path: String,
hadoopConfBroadcast: Broadcast[SerializableWritable[Configuration]], confBroadcast: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]], inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], keyClass: Class[K],
valueClass: Class[V], valueClass: Class[V],
minSplits: Int) minSplits: Int)
extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) { extends HadoopRDD[K, V](sc, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) {
private val jobConfCacheKey = "rdd_%d_job_conf".format(id)
override def getJobConf(): JobConf = { override def getJobConf(): JobConf = {
if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else { } else {
val newJobConf = new JobConf(hadoopConfBroadcast.value.value) val newJobConf = new JobConf(confBroadcast.value.value)
FileInputFormat.setInputPaths(newJobConf, path) FileInputFormat.setInputPaths(newJobConf, path)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf return newJobConf
...@@ -64,21 +62,6 @@ class HadoopFileRDD[K, V]( ...@@ -64,21 +62,6 @@ class HadoopFileRDD[K, V](
} }
} }
/**
* An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. tables in HBase).
*/
class HadoopDatasetRDD[K, V](
sc: SparkContext,
confBroadcast: Broadcast[SerializableWritable[JobConf]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
override def getJobConf(): JobConf = confBroadcast.value.value
}
/** /**
* A Spark split class that wraps around a Hadoop InputSplit. * A Spark split class that wraps around a Hadoop InputSplit.
*/ */
...@@ -95,18 +78,49 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp ...@@ -95,18 +78,49 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
/** /**
* A base class that provides core functionality for reading data partitions stored in Hadoop. * A base class that provides core functionality for reading data partitions stored in Hadoop.
*/ */
abstract class HadoopRDD[K, V]( class HadoopRDD[K, V](
sc: SparkContext, sc: SparkContext,
confBroadcast: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]], inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], keyClass: Class[K],
valueClass: Class[V], valueClass: Class[V],
minSplits: Int) minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging { extends RDD[(K, V)](sc, Nil) with Logging {
def this(
sc: SparkContext,
jobConf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(jobConf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
inputFormatClass,
keyClass,
valueClass,
minSplits)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
private val inputFormatCacheKey = "rdd_%d_input_format".format(id) private val inputFormatCacheKey = "rdd_%d_input_format".format(id)
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf protected def getJobConf(): JobConf = {
val conf: Configuration = confBroadcast.value.value
if (conf.isInstanceOf[JobConf]) {
return conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
val newJobConf = new JobConf(confBroadcast.value.value)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
}
}
def getInputFormat(conf: JobConf): InputFormat[K, V] = { def getInputFormat(conf: JobConf): InputFormat[K, V] = {
if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) { if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment