diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 0362f5a2233192df2387969823d0da0d92ed2577..573930dbf4e54781ea07a1ad746d0de5fed0d25c 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -106,6 +106,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
     set this configuration to "hdfs:///some/path".
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.access.namenodes</code></td>
+  <td>(none)</td>
+  <td>
+    A list of secure HDFS namenodes your Spark application is going to access. For example, `spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032`. The Spark application must have acess to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index b7e8636e02eb215610d93f8dd40395a9229f7a1e..ed8f56ab8b75e9d738ebc3ef2ea99a18d2dd6652 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.mapreduce.MRJobConfig
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.util.StringUtils
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -191,23 +191,11 @@ trait ClientBase extends Logging {
     // Upload Spark and the application JAR to the remote file system if necessary. Add them as
     // local resources to the application master.
     val fs = FileSystem.get(conf)
-
-    val delegTokenRenewer = Master.getMasterPrincipal(conf)
-    if (UserGroupInformation.isSecurityEnabled()) {
-      if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-        val errorMessage = "Can't get Master Kerberos principal for use as renewer"
-        logError(errorMessage)
-        throw new SparkException(errorMessage)
-      }
-    }
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      val dstFs = dst.getFileSystem(conf)
-      dstFs.addDelegationTokens(delegTokenRenewer, credentials)
-    }
+    val nns = ClientBase.getNameNodesToAccess(sparkConf) + dst
+    ClientBase.obtainTokensForNamenodes(nns, conf, credentials)
 
+    val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
     val localResources = HashMap[String, LocalResource]()
     FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
 
@@ -614,4 +602,40 @@ object ClientBase extends Logging {
     YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
             File.pathSeparator)
 
+  /** 
+   * Get the list of namenodes the user may access.
+   */
+  private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+    sparkConf.get("spark.yarn.access.namenodes", "").split(",").map(_.trim()).filter(!_.isEmpty)
+      .map(new Path(_)).toSet
+  }
+
+  private[yarn] def getTokenRenewer(conf: Configuration): String = {
+    val delegTokenRenewer = Master.getMasterPrincipal(conf)
+    logDebug("delegation token renewer is: " + delegTokenRenewer)
+    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+      val errorMessage = "Can't get Master Kerberos principal for use as renewer"
+      logError(errorMessage)
+      throw new SparkException(errorMessage)
+    }
+    delegTokenRenewer
+  }
+
+  /**
+   * Obtains tokens for the namenodes passed in and adds them to the credentials.
+   */
+  private[yarn] def obtainTokensForNamenodes(paths: Set[Path], conf: Configuration,
+    creds: Credentials) {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      val delegTokenRenewer = getTokenRenewer(conf)
+
+      paths.foreach {
+        dst =>
+          val dstFs = dst.getFileSystem(conf)
+          logDebug("getting token for namenode: " + dst)
+          dstFs.addDelegationTokens(delegTokenRenewer, creds)
+      }
+    }
+  }
+
 }
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
index 686714dc3648871f4a6e924fcb7e1bc9513dc94a..68cc2890f3a22637617e9e1193e6fd091b28aabf 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.mockito.Matchers._
 import org.mockito.Mockito._
+
+
 import org.scalatest.FunSuite
 import org.scalatest.Matchers
 
@@ -38,7 +40,7 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable.{ HashMap => MutableHashMap }
 import scala.util.Try
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkException, SparkConf}
 import org.apache.spark.util.Utils
 
 class ClientBaseSuite extends FunSuite with Matchers {
@@ -138,6 +140,57 @@ class ClientBaseSuite extends FunSuite with Matchers {
     }
   }
 
+  test("check access nns empty") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set())
+  }
+
+  test("check access nns unset") {
+    val sparkConf = new SparkConf()
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set())
+  }
+
+  test("check access nns") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032")))
+  }
+
+  test("check access nns space") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032")))
+  }
+
+  test("check access two nns") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
+    val nns = ClientBase.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
+  }
+
+  test("check token renewer") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+    hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+    val renewer = ClientBase.getTokenRenewer(hadoopConf)
+    renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+  }
+
+  test("check token renewer default") {
+    val hadoopConf = new Configuration()
+    val caught =
+      intercept[SparkException] {
+        ClientBase.getTokenRenewer(hadoopConf)
+      }
+    assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+  }
+
   object Fixtures {
 
     val knownDefYarnAppCP: Seq[String] =