diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 68b99ca1253d5f3b64395f55bedc322c9dbb81b2..3d36761cdabbbceb7980ee49321353627f730aac 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -26,7 +26,9 @@ import org.apache.spark.rdd.RDD
     sure a node doesn't load two copies of an RDD at once.
   */
 private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
-  private val loading = new HashSet[String]
+
+  /** Keys of RDD splits that are being computed/loaded. */
+  private val loading = new HashSet[String]()
 
   /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
   def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2fb4a530720c29d0f9eca5319a1522f3992aee07..febcf9c6ee7c1b6d84553eca1e265e782e736be7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
 
 import org.apache.mesos.MesosNativeLibrary
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.LocalSparkCluster
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
@@ -83,9 +84,11 @@ class SparkContext(
     val sparkHome: String = null,
     val jars: Seq[String] = Nil,
     val environment: Map[String, String] = Map(),
-    // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.
-    // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
+    // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc)
+    // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
+    // of data-local splits on host
+    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+      scala.collection.immutable.Map())
   extends Logging {
 
   // Ensure logging is initialized before we spawn any threads
@@ -238,7 +241,8 @@ class SparkContext(
     val env = SparkEnv.get
     val conf = env.hadoop.newConfiguration()
     // Explicitly check for S3 environment variables
-    if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+    if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+        System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
       conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
       conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
       conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
@@ -337,6 +341,8 @@ class SparkContext(
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
+    // Add necessary security credentials to the JobConf before broadcasting it.
+    SparkEnv.get.hadoop.addCredentials(conf)
     new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
@@ -347,10 +353,27 @@ class SparkContext(
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
-      ) : RDD[(K, V)] = {
-    val conf = new JobConf(hadoopConfiguration)
-    FileInputFormat.setInputPaths(conf, path)
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+      ): RDD[(K, V)] = {
+    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
+    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+    hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+  }
+
+  /**
+   * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
+   * that has already been broadcast, assuming that it's safe to use it to construct a
+   * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
+   */
+  def hadoopFile[K, V](
+      path: String,
+      confBroadcast: Broadcast[SerializableWritable[Configuration]],
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      minSplits: Int
+      ): RDD[(K, V)] = {
+    new HadoopFileRDD(
+      this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 0a5f4c368f3d1ef12374f5842f4bd7b0d0354817..993ba6bd3dbfad6faeed86d8d4f4ef39535dd00e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -16,6 +16,9 @@
  */
 
 package org.apache.spark.deploy
+
+import com.google.common.collect.MapMaker
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 
@@ -24,11 +27,16 @@ import org.apache.hadoop.mapred.JobConf
  * Contains util methods to interact with Hadoop from spark.
  */
 class SparkHadoopUtil {
+  // A general, soft-reference map for metadata needed during HadoopRDD split computation
+  // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
+  private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
 
-  // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+  // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop
+  // subsystems
   def newConfiguration(): Configuration = new Configuration()
 
-  // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+  // Add any user credentials to the job conf which are necessary for running on a secure Hadoop
+  // cluster
   def addCredentials(conf: JobConf) {}
 
   def isYarnMode(): Boolean = { false }
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cb6734e4199c2f50dcef278dbb23691c4cd3621..d3b3fffd409d17dfda16aca529596b064b9b1058 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
 
 import java.io.EOFException
 
+import org.apache.hadoop.mapred.FileInputFormat
 import org.apache.hadoop.mapred.InputFormat
 import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf
@@ -26,10 +27,47 @@ import org.apache.hadoop.mapred.RecordReader
 import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.ReflectionUtils
 
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
+  TaskContext}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.util.NextIterator
 import org.apache.hadoop.conf.{Configuration, Configurable}
 
+/**
+ * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
+ * system, or S3).
+ * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
+ * across multiple reads; the 'path' is the only variable that is different across new JobConfs
+ * created from the Configuration.
+ */
+class HadoopFileRDD[K, V](
+    sc: SparkContext,
+    path: String,
+    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+    inputFormatClass: Class[_ <: InputFormat[K, V]],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int)
+  extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) {
+
+  override def getJobConf(): JobConf = {
+    if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      // getJobConf() has been called previously, so there is already a local cache of the JobConf
+      // needed by this RDD.
+      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+    } else {
+      // Create a new JobConf, set the input file/directory paths to read from, and cache the
+      // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through
+      // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple
+      // getJobConf() calls for this RDD in the local process.
+      // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+      val newJobConf = new JobConf(broadcastedConf.value.value)
+      FileInputFormat.setInputPaths(newJobConf, path)
+      HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+      return newJobConf
+    }
+  }
+}
 
 /**
  * A Spark split class that wraps around a Hadoop InputSplit.
@@ -45,29 +83,80 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
 }
 
 /**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
- * system, or S3, tables in HBase, etc).
+ * A base class that provides core functionality for reading data partitions stored in Hadoop.
  */
 class HadoopRDD[K, V](
     sc: SparkContext,
-    @transient conf: JobConf,
+    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
     minSplits: Int)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
-  // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
-  private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+  def this(
+      sc: SparkContext,
+      conf: JobConf,
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      minSplits: Int) = {
+    this(
+      sc,
+      sc.broadcast(new SerializableWritable(conf))
+        .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+      inputFormatClass,
+      keyClass,
+      valueClass,
+      minSplits)
+  }
+
+  protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+
+  protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+
+  // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
+  protected def getJobConf(): JobConf = {
+    val conf: Configuration = broadcastedConf.value.value
+    if (conf.isInstanceOf[JobConf]) {
+      // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
+      return conf.asInstanceOf[JobConf]
+    } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      // getJobConf() has been called previously, so there is already a local cache of the JobConf
+      // needed by this RDD.
+      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+    } else {
+      // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
+      // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+      // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+      val newJobConf = new JobConf(broadcastedConf.value.value)
+      HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+      return newJobConf
+    }
+  }
+
+  protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
+    if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
+      return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
+    }
+    // Once an InputFormat for this RDD is created, cache it so that only one reflection call is
+    // done in each local process.
+    val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+      .asInstanceOf[InputFormat[K, V]]
+    if (newInputFormat.isInstanceOf[Configurable]) {
+      newInputFormat.asInstanceOf[Configurable].setConf(conf)
+    }
+    HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
+    return newInputFormat
+  }
 
   override def getPartitions: Array[Partition] = {
-    val env = SparkEnv.get
-    env.hadoop.addCredentials(conf)
-    val inputFormat = createInputFormat(conf)
+    val jobConf = getJobConf()
+    val inputFormat = getInputFormat(jobConf)
     if (inputFormat.isInstanceOf[Configurable]) {
-      inputFormat.asInstanceOf[Configurable].setConf(conf)
+      inputFormat.asInstanceOf[Configurable].setConf(jobConf)
     }
-    val inputSplits = inputFormat.getSplits(conf, minSplits)
+    val inputSplits = inputFormat.getSplits(jobConf, minSplits)
     val array = new Array[Partition](inputSplits.size)
     for (i <- 0 until inputSplits.size) {
       array(i) = new HadoopPartition(id, i, inputSplits(i))
@@ -75,22 +164,14 @@ class HadoopRDD[K, V](
     array
   }
 
-  def createInputFormat(conf: JobConf): InputFormat[K, V] = {
-    ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
-      .asInstanceOf[InputFormat[K, V]]
-  }
-
   override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
     val split = theSplit.asInstanceOf[HadoopPartition]
     logInfo("Input split: " + split.inputSplit)
     var reader: RecordReader[K, V] = null
 
-    val conf = confBroadcast.value.value
-    val fmt = createInputFormat(conf)
-    if (fmt.isInstanceOf[Configurable]) {
-      fmt.asInstanceOf[Configurable].setConf(conf)
-    }
-    reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
+    val jobConf = getJobConf()
+    val inputFormat = getInputFormat(jobConf)
+    reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
 
     // Register an on-task-completion callback to close the input stream.
     context.addOnCompleteCallback{ () => closeIfNeeded() }
@@ -127,5 +208,18 @@ class HadoopRDD[K, V](
     // Do nothing. Hadoop RDD should not be checkpointed.
   }
 
-  def getConf: Configuration = confBroadcast.value.value
+  def getConf: Configuration = getJobConf()
+}
+
+private[spark] object HadoopRDD {
+  /**
+   * The three methods below are helpers for accessing the local map, a property of the SparkEnv of
+   * the local process.
+   */
+  def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+
+  def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+
+  def putCachedMetadata(key: String, value: Any) =
+    SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
 }