diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 158197ae4d02239cf3afb4da01a7d017118a154e..ade75e20d511264ac5f01e1f54b6c3a0299e2e9a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
 
 import org.apache.mesos.MesosNativeLibrary
 
-import org.apache.spark.deploy.LocalSparkCluster
+import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
@@ -245,7 +245,7 @@ class SparkContext(
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
   val hadoopConfiguration = {
     val env = SparkEnv.get
-    val conf = env.hadoop.newConfiguration()
+    val conf = SparkHadoopUtil.get.newConfiguration()
     // Explicitly check for S3 environment variables
     if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
         System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
@@ -379,7 +379,7 @@ class SparkContext(
       minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
     // Add necessary security credentials to the JobConf before broadcasting it.
-    SparkEnv.get.hadoop.addCredentials(conf)
+    SparkHadoopUtil.get.addCredentials(conf)
     new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
   }
 
@@ -698,7 +698,7 @@ class SparkContext(
         key = uri.getScheme match {
           // A JAR file which exists only on the driver node
           case null | "file" =>
-            if (env.hadoop.isYarnMode()) {
+            if (SparkHadoopUtil.get.isYarnMode()) {
               // In order for this to work on yarn the user must specify the --addjars option to
               // the client to upload the file into the distributed cache to make it show up in the
               // current working directory.
@@ -936,9 +936,8 @@ class SparkContext(
    * prevent accidental overriding of checkpoint files in the existing directory.
    */
   def setCheckpointDir(dir: String, useExisting: Boolean = false) {
-    val env = SparkEnv.get
     val path = new Path(dir)
-    val fs = path.getFileSystem(env.hadoop.newConfiguration())
+    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
     if (!useExisting) {
       if (fs.exists(path)) {
         throw new Exception("Checkpoint directory '" + path + "' already exists.")
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index aaab717bcf1eeee98a441b517e7cc2a9c34d0d7b..ff2df8fb6a2fc7256e40b161f4e3a4bd054a3731 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -25,13 +25,13 @@ import akka.remote.RemoteActorRefProvider
 
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
 import org.apache.spark.network.ConnectionManager
 import org.apache.spark.serializer.{Serializer, SerializerManager}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.api.python.PythonWorkerFactory
 
+import com.google.common.collect.MapMaker
 
 /**
  * Holds all the runtime environment objects for a running Spark instance (either master or worker),
@@ -58,18 +58,9 @@ class SparkEnv (
 
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
-  val hadoop = {
-    val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
-    if(yarnMode) {
-      try {
-        Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
-      } catch {
-        case th: Throwable => throw new SparkException("Unable to load YARN support", th)
-      }
-    } else {
-      new SparkHadoopUtil
-    }
-  }
+  // A general, soft-reference map for metadata needed during HadoopRDD split computation
+  // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
+  private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
 
   def stop() {
     pythonWorkers.foreach { case(key, worker) => worker.stop() }
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 83cd3df5fa20e5fa2b6b96803cff19a082a38b13..6bc846aa92eb4440bd0c61a20d9f7a2aaf2467b7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -20,17 +20,13 @@ package org.apache.spark.deploy
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 
-import com.google.common.collect.MapMaker
-
+import org.apache.spark.SparkException
 
 /**
  * Contains util methods to interact with Hadoop from Spark.
  */
 private[spark]
 class SparkHadoopUtil {
-  // A general, soft-reference map for metadata needed during HadoopRDD split computation
-  // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
-  private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
 
   /**
    * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
@@ -45,5 +41,23 @@ class SparkHadoopUtil {
   def addCredentials(conf: JobConf) {}
 
   def isYarnMode(): Boolean = { false }
-
+}
+  
+object SparkHadoopUtil {
+  private val hadoop = { 
+    val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+    if (yarnMode) {
+      try {
+        Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+      } catch {
+       case th: Throwable => throw new SparkException("Unable to load YARN support", th)
+      }
+    } else {
+      new SparkHadoopUtil
+    }
+  }
+  
+  def get: SparkHadoopUtil = {
+    hadoop
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index ccaaecb85b03c6d07bc6a29a0f08868620641898..d3033ea4a627b3bdc3e878488da88fcc9eb43b5f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.rdd
 
 import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.{NullWritable, BytesWritable}
@@ -83,7 +84,7 @@ private[spark] object CheckpointRDD extends Logging {
   def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
     val env = SparkEnv.get
     val outputDir = new Path(path)
-    val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
+    val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
 
     val finalOutputName = splitIdToFile(ctx.partitionId)
     val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -122,7 +123,7 @@ private[spark] object CheckpointRDD extends Logging {
 
   def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
     val env = SparkEnv.get
-    val fs = path.getFileSystem(env.hadoop.newConfiguration())
+    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
     val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
     val fileInputStream = fs.open(path, bufferSize)
     val serializer = env.serializer.newInstance()
@@ -145,7 +146,7 @@ private[spark] object CheckpointRDD extends Logging {
     val sc = new SparkContext(cluster, "CheckpointRDD Test")
     val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
     val path = new Path(hdfsPath, "temp")
-    val fs = path.getFileSystem(env.hadoop.newConfiguration())
+    val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
     sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
     val cpRDD = new CheckpointRDD[Int](sc, path.toString)
     assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index fad042c7ae184daf60d14717bd52a93fed399c26..32901a508f53b34d44c01fc631e9927680bd312a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.NextIterator
 import org.apache.hadoop.conf.{Configuration, Configurable}
 
@@ -198,10 +199,10 @@ private[spark] object HadoopRDD {
    * The three methods below are helpers for accessing the local map, a property of the SparkEnv of
    * the local process.
    */
-  def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+  def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key)
 
-  def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+  def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key)
 
   def putCachedMetadata(key: String, value: Any) =
-    SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
+    SparkEnv.get.hadoopJobMetadata.put(key, value)
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 370ccd183c37849ce269798d9aa4a0a891368343..1791ee660db0212ca7afc4dbc99cb18639087009 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
 import scala.collection.immutable.Set
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 import org.apache.hadoop.security.UserGroupInformation
@@ -87,9 +88,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
 
   // This method does not expect failures, since validate has already passed ...
   private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
-    val env = SparkEnv.get
     val conf = new JobConf(configuration)
-    env.hadoop.addCredentials(conf)
+    SparkHadoopUtil.get.addCredentials(conf)
     FileInputFormat.setInputPaths(conf, path)
 
     val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@@ -108,9 +108,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
 
   // This method does not expect failures, since validate has already passed ...
   private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
-    val env = SparkEnv.get
     val jobConf = new JobConf(configuration)
-    env.hadoop.addCredentials(jobConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
     FileInputFormat.setInputPaths(jobConf, path)
 
     val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a3b3968c5e451ce09c4794e916d6c3dc396c95da..fd2811e44c9b118a057725b29043df88d1becc3f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -280,9 +280,8 @@ private[spark] object Utils extends Logging {
         }
       case _ =>
         // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
-        val env = SparkEnv.get
         val uri = new URI(url)
-        val conf = env.hadoop.newConfiguration()
+        val conf = SparkHadoopUtil.get.newConfiguration()
         val fs = FileSystem.get(uri, conf)
         val in = fs.open(new Path(uri))
         val out = new FileOutputStream(tempFile)
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 646682878fda5373027b660c53c17e39176d0416..86dd9ca1b3e58032196de6d669307ab1ce009557 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -21,6 +21,7 @@ import java.util.Random
 import scala.math.exp
 import org.apache.spark.util.Vector
 import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.InputFormatInfo
 
 /**
@@ -51,7 +52,7 @@ object SparkHdfsLR {
       System.exit(1)
     }
     val inputPath = args(1)
-    val conf = SparkEnv.get.hadoop.newConfiguration()
+    val conf = SparkHadoopUtil.get.newConfiguration()
     val sc = new SparkContext(args(0), "SparkHdfsLR",
       System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), 
       InputFormatInfo.computePreferredLocations(