diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 270d8d4b2d48cbb7b78d3dd268f5d1afa0214efe..ff2df8fb6a2fc7256e40b161f4e3a4bd054a3731 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -31,6 +31,7 @@ import org.apache.spark.serializer.{Serializer, SerializerManager} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.api.python.PythonWorkerFactory +import com.google.common.collect.MapMaker /** * Holds all the runtime environment objects for a running Spark instance (either master or worker), @@ -57,6 +58,10 @@ class SparkEnv ( private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + // A general, soft-reference map for metadata needed during HadoopRDD split computation + // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). + private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() + def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 8c0c111d618fb44cbd88743cb5f1c315d1ed1a5c..6bc846aa92eb4440bd0c61a20d9f7a2aaf2467b7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -20,19 +20,13 @@ package org.apache.spark.deploy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf -import com.google.common.collect.MapMaker - import org.apache.spark.SparkException - /** * Contains util methods to interact with Hadoop from Spark. */ private[spark] class SparkHadoopUtil { - // A general, soft-reference map for metadata needed during HadoopRDD split computation - // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). - private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop @@ -47,7 +41,6 @@ class SparkHadoopUtil { def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } - } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 0e8eaf4be65edea86fc1f5b0baa50b7d2c2422c9..32901a508f53b34d44c01fc631e9927680bd312a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -199,10 +199,10 @@ private[spark] object HadoopRDD { * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. */ - def getCachedMetadata(key: String) = SparkHadoopUtil.get.hadoopJobMetadata.get(key) + def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key) - def containsCachedMetadata(key: String) = SparkHadoopUtil.get.hadoopJobMetadata.containsKey(key) + def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key) def putCachedMetadata(key: String, value: Any) = - SparkHadoopUtil.get.hadoopJobMetadata.put(key, value) + SparkEnv.get.hadoopJobMetadata.put(key, value) }