diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index f416b95afbe64970c1f4be3c1024663f58825c7c..993ba6bd3dbfad6faeed86d8d4f4ef39535dd00e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -27,14 +27,16 @@ import org.apache.hadoop.mapred.JobConf * Contains util methods to interact with Hadoop from spark. */ class SparkHadoopUtil { - // A general map for metadata needed during HadoopRDD split computation (e.g., HadoopFileRDD uses - // this to cache JobConfs). + // A general, soft-reference map for metadata needed during HadoopRDD split computation + // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop + // subsystems def newConfiguration(): Configuration = new Configuration() - // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + // Add any user credentials to the job conf which are necessary for running on a secure Hadoop + // cluster def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 51e5bb88d2b3c87d5d386f487bfbc76fd36a840f..d3b3fffd409d17dfda16aca529596b064b9b1058 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -52,8 +52,15 @@ class HadoopFileRDD[K, V]( override def getJobConf(): JobConf = { if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { + // Create a new JobConf, set the input file/directory paths to read from, and cache the + // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through + // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple + // getJobConf() calls for this RDD in the local process. + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. val newJobConf = new JobConf(broadcastedConf.value.value) FileInputFormat.setInputPaths(newJobConf, path) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) @@ -112,10 +119,16 @@ class HadoopRDD[K, V]( protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { + // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. return conf.asInstanceOf[JobConf] } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { + // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the + // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. val newJobConf = new JobConf(broadcastedConf.value.value) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) return newJobConf @@ -126,6 +139,8 @@ class HadoopRDD[K, V]( if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) { return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]] } + // Once an InputFormat for this RDD is created, cache it so that only one reflection call is + // done in each local process. val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) .asInstanceOf[InputFormat[K, V]] if (newInputFormat.isInstanceOf[Configurable]) { @@ -197,6 +212,10 @@ class HadoopRDD[K, V]( } private[spark] object HadoopRDD { + /** + * The three methods below are helpers for accessing the local map, a property of the SparkEnv of + * the local process. + */ def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)