diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 7dae248e3e7db4bde020954fed578f585ee19323..10cbeb8b94325803b1b6013633957f62cd1d59fc 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} -import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.{SecurityManager, SparkConf, Logging} class ExecutorRunnable( @@ -46,7 +46,8 @@ class ExecutorRunnable( slaveId: String, hostname: String, executorMemory: Int, - executorCores: Int) + executorCores: Int, + securityMgr: SecurityManager) extends Runnable with ExecutorRunnableUtil with Logging { var rpc: YarnRPC = YarnRPC.create(conf) @@ -86,6 +87,8 @@ class ExecutorRunnable( logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) + ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) + // Send the start request to the ContainerManager val startReq = Records.newRecord(classOf[StartContainerRequest]) .asInstanceOf[StartContainerRequest] diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 9f9e16c06452b623844e7efea9efdd7522f7729e..85d6274df2fcbf302859f09363ab4bbd961b1927 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} -import org.apache.spark.SparkConf +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.conf.Configuration @@ -41,8 +41,9 @@ private[yarn] class YarnAllocationHandler( resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]]) - extends YarnAllocator(conf, sparkConf, args, preferredNodes) { + preferredNodes: collection.Map[String, collection.Set[SplitInfo]], + securityMgr: SecurityManager) + extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) { private val lastResponseId = new AtomicInteger() private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList() diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala index cc5392192ec519fbdd8aa79489ff4030e67e60af..ad27a9ab781d28468f5ebbdec3f372a40dbd086b 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.scheduler.SplitInfo import org.apache.spark.util.Utils @@ -45,7 +45,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC sparkConf: SparkConf, preferredNodeLocations: Map[String, Set[SplitInfo]], uiAddress: String, - uiHistoryAddress: String) = { + uiHistoryAddress: String, + securityMgr: SecurityManager) = { this.rpc = YarnRPC.create(conf) this.uiHistoryAddress = uiHistoryAddress @@ -53,7 +54,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC registerApplicationMaster(uiAddress) new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args, - preferredNodeLocations) + preferredNodeLocations, securityMgr) } override def getAttemptId() = { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 98039a20de245d76d0f1d46c496e6bfe5491cb05..a879c833a014f03d0a0a2505782540d69561e21a 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -116,7 +116,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, val securityMgr = new SecurityManager(sparkConf) if (isDriver) { - runDriver() + runDriver(securityMgr) } else { runExecutorLauncher(securityMgr) } @@ -157,7 +157,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkContextRef.compareAndSet(sc, null) } - private def registerAM(uiAddress: String) = { + private def registerAM(uiAddress: String, securityMgr: SecurityManager) = { val sc = sparkContextRef.get() val appId = client.getAttemptId().getApplicationId().toString() @@ -170,13 +170,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, if (sc != null) sc.getConf else sparkConf, if (sc != null) sc.preferredNodeLocationData else Map(), uiAddress, - historyAddress) + historyAddress, + securityMgr) allocator.allocateResources() reporterThread = launchReporterThread() } - private def runDriver(): Unit = { + private def runDriver(securityMgr: SecurityManager): Unit = { addAmIpFilter() val userThread = startUserClass() @@ -188,7 +189,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, if (sc == null) { finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.") } else { - registerAM(sc.ui.appUIHostPort) + registerAM(sc.ui.appUIHostPort, securityMgr) try { userThread.join() } finally { @@ -203,7 +204,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, conf = sparkConf, securityManager = securityMgr)._1 actor = waitForSparkDriver() addAmIpFilter() - registerAM(sparkConf.get("spark.driver.appUIAddress", "")) + registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) // In client mode the actor will stop the reporter thread. reporterThread.join() diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 5d8e5e6dffe7fd298c2d05805060528ec9c988b3..8075b7a7fb837ba89de7e10c716849607d5ea36c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -430,10 +430,8 @@ trait ClientBase extends Logging { // send the acl settings into YARN to control who has access via YARN interfaces val securityManager = new SecurityManager(sparkConf) - val acls = Map[ApplicationAccessType, String] ( - ApplicationAccessType.VIEW_APP -> securityManager.getViewAcls, - ApplicationAccessType.MODIFY_APP -> securityManager.getModifyAcls) - amContainer.setApplicationACLs(acls) + amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager)) + amContainer } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index c74dd1c2b21dce3791d5bec9250e80ae72d59357..02b9a81bf6b502a4c29688398ec06bc6ebdeb027 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse -import org.apache.spark.{Logging, SparkConf, SparkEnv} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -55,7 +55,8 @@ private[yarn] abstract class YarnAllocator( conf: Configuration, sparkConf: SparkConf, args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]]) + preferredNodes: collection.Map[String, collection.Set[SplitInfo]], + securityMgr: SecurityManager) extends Logging { // These three are locked on allocatedHostToContainersMap. Complementary data structures @@ -280,7 +281,8 @@ private[yarn] abstract class YarnAllocator( executorId, executorHostname, executorMemory, - executorCores) + executorCores, + securityMgr) new Thread(executorRunnable).start() } } @@ -444,4 +446,4 @@ private[yarn] abstract class YarnAllocator( } -} \ No newline at end of file +} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 922d7d1a854a5c8f2cedc9d6581a78b3c80666c3..ed65e56b3e413e87b5aa68f31a3b1c1dcbbfe86d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -22,7 +22,7 @@ import scala.collection.{Map, Set} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.records._ -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler.SplitInfo /** @@ -45,7 +45,8 @@ trait YarnRMClient { sparkConf: SparkConf, preferredNodeLocations: Map[String, Set[SplitInfo]], uiAddress: String, - uiHistoryAddress: String): YarnAllocator + uiHistoryAddress: String, + securityMgr: SecurityManager): YarnAllocator /** * Shuts down the AM. Guaranteed to only be called once. diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index dc77f1236492dd51d21a41ef0033c3ebd645c1a1..4a33e34c3bfc7048aff4084ce17120baa34648a2 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -32,10 +32,11 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.StringInterner import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants +import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -211,4 +212,12 @@ object YarnSparkHadoopUtil { } } + private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager): + Map[ApplicationAccessType, String] = { + Map[ApplicationAccessType, String] ( + ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls, + ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls + ) + } + } diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 75db8ee6d468f7adddead91c815160682cb564a0..2cc5abb3a890c6e9e5986dd6abc51da4069dad9b 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -23,7 +23,10 @@ import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.{FunSuite, Matchers} -import org.apache.spark.{Logging, SparkConf} +import org.apache.hadoop.yarn.api.records.ApplicationAccessType + +import org.apache.spark.{Logging, SecurityManager, SparkConf} + class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { @@ -74,4 +77,75 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { yarnConf.get(key) should not be default.get(key) } + + test("test getApplicationAclsForYarn acls on") { + + // spark acls on, just pick up default user + val sparkConf = new SparkConf() + sparkConf.set("spark.acls.enable", "true") + + val securityMgr = new SecurityManager(sparkConf) + val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) + + val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) + val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) + + viewAcls match { + case Some(vacls) => { + val aclSet = vacls.split(',').map(_.trim).toSet + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + modifyAcls match { + case Some(macls) => { + val aclSet = macls.split(',').map(_.trim).toSet + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + } + + test("test getApplicationAclsForYarn acls on and specify users") { + + // default spark acls are on and specify acls + val sparkConf = new SparkConf() + sparkConf.set("spark.acls.enable", "true") + sparkConf.set("spark.ui.view.acls", "user1,user2") + sparkConf.set("spark.modify.acls", "user3,user4") + + val securityMgr = new SecurityManager(sparkConf) + val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) + + val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) + val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) + + viewAcls match { + case Some(vacls) => { + val aclSet = vacls.split(',').map(_.trim).toSet + assert(aclSet.contains("user1")) + assert(aclSet.contains("user2")) + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + modifyAcls match { + case Some(macls) => { + val aclSet = macls.split(',').map(_.trim).toSet + assert(aclSet.contains("user3")) + assert(aclSet.contains("user4")) + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + + } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 07ba0a4b30bd7b05183ef0fbbc867e5c31d4fecc..833be12982e71676bdb70bb90e809ec87024013e 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} -import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.{SecurityManager, SparkConf, Logging} class ExecutorRunnable( @@ -46,7 +46,8 @@ class ExecutorRunnable( slaveId: String, hostname: String, executorMemory: Int, - executorCores: Int) + executorCores: Int, + securityMgr: SecurityManager) extends Runnable with ExecutorRunnableUtil with Logging { var rpc: YarnRPC = YarnRPC.create(conf) @@ -85,6 +86,8 @@ class ExecutorRunnable( logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) + ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) + // Send the start request to the ContainerManager nmClient.startContainer(container, ctx) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index ed31457b61571aad84ce7961c8e1b2b25a94ba97..c887cb52dd9cf95003c5f178f7449a41f0dfff24 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} -import org.apache.spark.SparkConf +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.conf.Configuration @@ -39,8 +39,9 @@ private[yarn] class YarnAllocationHandler( amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]]) - extends YarnAllocator(conf, sparkConf, args, preferredNodes) { + preferredNodes: collection.Map[String, collection.Set[SplitInfo]], + securityMgr: SecurityManager) + extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) { override protected def releaseContainer(container: Container) = { amClient.releaseAssignedContainer(container.getId()) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala index e8b8d9bc722bd0f10cbdf36533fb0b3a9929ce1a..54bc6b14c44ce7eae025c48676e5b784adfc206b 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.webapp.util.WebAppUtils -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.scheduler.SplitInfo import org.apache.spark.util.Utils @@ -46,7 +46,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC sparkConf: SparkConf, preferredNodeLocations: Map[String, Set[SplitInfo]], uiAddress: String, - uiHistoryAddress: String) = { + uiHistoryAddress: String, + securityMgr: SecurityManager) = { amClient = AMRMClient.createAMRMClient() amClient.init(conf) amClient.start() @@ -55,7 +56,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC logInfo("Registering the ApplicationMaster") amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args, - preferredNodeLocations) + preferredNodeLocations, securityMgr) } override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") =