diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 68b99ca1253d5f3b64395f55bedc322c9dbb81b2..3d36761cdabbbceb7980ee49321353627f730aac 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -26,7 +26,9 @@ import org.apache.spark.rdd.RDD
     sure a node doesn't load two copies of an RDD at once.
   */
 private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
-  private val loading = new HashSet[String]
+
+  /** Keys of RDD splits that are being computed/loaded. */
+  private val loading = new HashSet[String]()
 
   /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
   def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 912ce752fb383cd83d1204400d24b04f0fce10e5..11e92945ec600f2fcace881121684d079dc645c6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
 
 import org.apache.mesos.MesosNativeLibrary
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.LocalSparkCluster
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
@@ -337,7 +338,11 @@ class SparkContext(
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+    // Add necessary security credentials to the JobConf before broadcasting it.
+    SparkEnv.get.hadoop.addCredentials(conf)
+    // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it.
+    val confBroadcast = broadcast(new SerializableWritable(conf))
+    new HadoopDatasetRDD(this, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
   /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
@@ -347,10 +352,26 @@ class SparkContext(
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
-      ) : RDD[(K, V)] = {
-    val conf = new JobConf(hadoopConfiguration)
-    FileInputFormat.setInputPaths(conf, path)
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+      ): RDD[(K, V)] = {
+    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+    hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+  }
+
+  /**
+   * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
+   * that has already been broadcast, assuming that it's safe to use it to construct a
+   * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
+   */
+  def hadoopFile[K, V](
+      path: String,
+      confBroadcast: Broadcast[SerializableWritable[Configuration]],
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      minSplits: Int
+      ): RDD[(K, V)] = {
+    new HadoopFileRDD(
+      this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 0a5f4c368f3d1ef12374f5842f4bd7b0d0354817..f416b95afbe64970c1f4be3c1024663f58825c7c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -16,6 +16,9 @@
  */
 
 package org.apache.spark.deploy
+
+import com.google.common.collect.MapMaker
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 
@@ -24,6 +27,9 @@ import org.apache.hadoop.mapred.JobConf
  * Contains util methods to interact with Hadoop from spark.
  */
 class SparkHadoopUtil {
+  // A general map for metadata needed during HadoopRDD split computation (e.g., HadoopFileRDD uses
+  // this to cache JobConfs).
+  private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
 
   // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
   def newConfiguration(): Configuration = new Configuration()
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cb6734e4199c2f50dcef278dbb23691c4cd3621..dd9fc7b79af54b8bd2c502188f1c0c31eb15c7be 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
 
 import java.io.EOFException
 
+import org.apache.hadoop.mapred.FileInputFormat
 import org.apache.hadoop.mapred.InputFormat
 import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf
@@ -26,10 +27,57 @@ import org.apache.hadoop.mapred.RecordReader
 import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.ReflectionUtils
 
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
+  TaskContext}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.util.NextIterator
 import org.apache.hadoop.conf.{Configuration, Configurable}
 
+/**
+ * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
+ * system, or S3).
+ * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
+ * across multiple reads; the 'path' is the only variable that is different across new JobConfs
+ * created from the Configuration.
+ */
+class HadoopFileRDD[K, V](
+    sc: SparkContext,
+    path: String,
+    hadoopConfBroadcast: Broadcast[SerializableWritable[Configuration]],
+    inputFormatClass: Class[_ <: InputFormat[K, V]],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int)
+  extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
+
+  private val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+
+  override def getJobConf(): JobConf = {
+    if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+    } else {
+      val newJobConf = new JobConf(hadoopConfBroadcast.value.value)
+      FileInputFormat.setInputPaths(newJobConf, path)
+      HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+      return newJobConf
+    }
+  }
+}
+
+/**
+ * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. tables in HBase).
+ */
+class HadoopDatasetRDD[K, V](
+    sc: SparkContext,
+    confBroadcast: Broadcast[SerializableWritable[JobConf]],
+    inputFormatClass: Class[_ <: InputFormat[K, V]],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int)
+  extends HadoopRDD[K, V](sc, inputFormatClass, keyClass, valueClass, minSplits) {
+
+  override def getJobConf(): JobConf = confBroadcast.value.value
+}
 
 /**
  * A Spark split class that wraps around a Hadoop InputSplit.
@@ -45,29 +93,41 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
 }
 
 /**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
- * system, or S3, tables in HBase, etc).
+ * A base class that provides core functionality for reading data partitions stored in Hadoop.
  */
-class HadoopRDD[K, V](
+abstract class HadoopRDD[K, V](
     sc: SparkContext,
-    @transient conf: JobConf,
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
     minSplits: Int)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
-  // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
-  private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+  private val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+
+  // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
+  protected def getJobConf(): JobConf
+
+  def getInputFormat(conf: JobConf): InputFormat[K, V] = {
+    if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
+      return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
+    }
+    val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+      .asInstanceOf[InputFormat[K, V]]
+    if (newInputFormat.isInstanceOf[Configurable]) {
+      newInputFormat.asInstanceOf[Configurable].setConf(conf)
+    }
+    HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
+    return newInputFormat
+  }
 
   override def getPartitions: Array[Partition] = {
-    val env = SparkEnv.get
-    env.hadoop.addCredentials(conf)
-    val inputFormat = createInputFormat(conf)
+    val jobConf = getJobConf()
+    val inputFormat = getInputFormat(jobConf)
     if (inputFormat.isInstanceOf[Configurable]) {
-      inputFormat.asInstanceOf[Configurable].setConf(conf)
+      inputFormat.asInstanceOf[Configurable].setConf(jobConf)
     }
-    val inputSplits = inputFormat.getSplits(conf, minSplits)
+    val inputSplits = inputFormat.getSplits(jobConf, minSplits)
     val array = new Array[Partition](inputSplits.size)
     for (i <- 0 until inputSplits.size) {
       array(i) = new HadoopPartition(id, i, inputSplits(i))
@@ -75,22 +135,14 @@ class HadoopRDD[K, V](
     array
   }
 
-  def createInputFormat(conf: JobConf): InputFormat[K, V] = {
-    ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
-      .asInstanceOf[InputFormat[K, V]]
-  }
-
   override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
     val split = theSplit.asInstanceOf[HadoopPartition]
     logInfo("Input split: " + split.inputSplit)
     var reader: RecordReader[K, V] = null
 
-    val conf = confBroadcast.value.value
-    val fmt = createInputFormat(conf)
-    if (fmt.isInstanceOf[Configurable]) {
-      fmt.asInstanceOf[Configurable].setConf(conf)
-    }
-    reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
+    val jobConf = getJobConf()
+    val inputFormat = getInputFormat(jobConf)
+    reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
 
     // Register an on-task-completion callback to close the input stream.
     context.addOnCompleteCallback{ () => closeIfNeeded() }
@@ -127,5 +179,14 @@ class HadoopRDD[K, V](
     // Do nothing. Hadoop RDD should not be checkpointed.
   }
 
-  def getConf: Configuration = confBroadcast.value.value
+  def getConf: Configuration = getJobConf()
+}
+
+object HadoopRDD {
+  def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+
+  def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+
+  def putCachedMetadata(key: String, value: Any) =
+    SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
 }