Skip to content
Snippets Groups Projects
Commit 72ea56da authored by witgo's avatar witgo Committed by Thomas Graves
Browse files

SPARK-1291: Link the spark UI to RM ui in yarn-client mode

Author: witgo <witgo@qq.com>

Closes #1112 from witgo/SPARK-1291 and squashes the following commits:

6022bcd [witgo] review commit
1fbb925 [witgo] add addAmIpFilter to yarn alpha
210299c [witgo] review commit
1b92a07 [witgo] review commit
6896586 [witgo] Add comments to addWebUIFilter
3e9630b [witgo] review commit
142ee29 [witgo] review commit
1fe7710 [witgo] Link the spark UI to RM ui in yarn-client mode
parent 9dd635eb
No related branches found
No related tags found
No related merge requests found
...@@ -66,4 +66,7 @@ private[spark] object CoarseGrainedClusterMessages { ...@@ -66,4 +66,7 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
case class AddWebUIFilter(filterName:String, filterParams: String, proxyBase :String)
extends CoarseGrainedClusterMessage
} }
...@@ -31,6 +31,7 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} ...@@ -31,6 +31,7 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils} import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.ui.JettyUtils
/** /**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka. * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
...@@ -136,6 +137,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A ...@@ -136,6 +137,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
removeExecutor(executorId, reason) removeExecutor(executorId, reason)
sender ! true sender ! true
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
sender ! true
case DisassociatedEvent(_, address, _) => case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_, addressToExecutorId.get(address).foreach(removeExecutor(_,
"remote Akka client disassociated")) "remote Akka client disassociated"))
...@@ -276,6 +280,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A ...@@ -276,6 +280,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
} }
false false
} }
// Add filters to the SparkUI
def addWebUIFilter(filterName: String, filterParams: String, proxyBase: String) {
if (proxyBase != null && proxyBase.nonEmpty) {
System.setProperty("spark.ui.proxyBase", proxyBase)
}
if (Seq(filterName, filterParams).forall(t => t != null && t.nonEmpty)) {
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
conf.set("spark.ui.filters", filterName)
conf.set(s"spark.$filterName.params", filterParams)
JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
}
}
} }
private[spark] object CoarseGrainedSchedulerBackend { private[spark] object CoarseGrainedSchedulerBackend {
......
...@@ -136,7 +136,16 @@ private[spark] object UIUtils extends Logging { ...@@ -136,7 +136,16 @@ private[spark] object UIUtils extends Logging {
} }
// Yarn has to go through a proxy so the base uri is provided and has to be on all links // Yarn has to go through a proxy so the base uri is provided and has to be on all links
val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("") def uiRoot: String = {
if (System.getenv("APPLICATION_WEB_PROXY_BASE") != null) {
System.getenv("APPLICATION_WEB_PROXY_BASE")
} else if (System.getProperty("spark.ui.proxyBase") != null) {
System.getProperty("spark.ui.proxyBase")
}
else {
""
}
}
def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
......
...@@ -32,6 +32,7 @@ import akka.actor.Terminated ...@@ -32,6 +32,7 @@ import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.scheduler.SplitInfo import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
...@@ -81,6 +82,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp ...@@ -81,6 +82,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
case x: DisassociatedEvent => case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x") logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true driverClosed = true
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
driver ! x
} }
} }
...@@ -111,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp ...@@ -111,7 +115,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
} }
waitForSparkMaster() waitForSparkMaster()
addAmIpFilter()
// Allocate all containers // Allocate all containers
allocateExecutors() allocateExecutors()
...@@ -171,7 +175,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp ...@@ -171,7 +175,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
} }
private def registerApplicationMaster(): RegisterApplicationMasterResponse = { private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster") val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest]) val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest] .asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId) appMasterRequest.setApplicationAttemptId(appAttemptId)
...@@ -180,10 +185,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp ...@@ -180,10 +185,21 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
appMasterRequest.setHost(Utils.localHostName()) appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0) appMasterRequest.setRpcPort(0)
// What do we provide here ? Might make sense to expose something sensible later ? // What do we provide here ? Might make sense to expose something sensible later ?
appMasterRequest.setTrackingUrl("") appMasterRequest.setTrackingUrl(appUIAddress)
resourceManager.registerApplicationMaster(appMasterRequest) resourceManager.registerApplicationMaster(appMasterRequest)
} }
// add the yarn amIpFilter that Yarn requires for properly securing the UI
private def addAmIpFilter() {
val proxy = YarnConfiguration.getProxyHostAndPort(conf)
val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val uriBase = "http://" + proxy + proxyBase
val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
}
private def waitForSparkMaster() { private def waitForSparkMaster() {
logInfo("Waiting for spark driver to be reachable.") logInfo("Waiting for spark driver to be reachable.")
var driverUp = false var driverUp = false
......
...@@ -48,6 +48,7 @@ private[spark] class YarnClientSchedulerBackend( ...@@ -48,6 +48,7 @@ private[spark] class YarnClientSchedulerBackend(
val driverHost = conf.get("spark.driver.host") val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port") val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
val argsArrayBuf = new ArrayBuffer[String]() val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += ( argsArrayBuf += (
......
...@@ -31,10 +31,12 @@ import akka.actor.Terminated ...@@ -31,10 +31,12 @@ import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.scheduler.SplitInfo import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
/** /**
* An application master that allocates executors on behalf of a driver that is running outside * An application master that allocates executors on behalf of a driver that is running outside
...@@ -82,6 +84,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp ...@@ -82,6 +84,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
case x: DisassociatedEvent => case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x") logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true driverClosed = true
case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
driver ! x
} }
} }
...@@ -99,6 +104,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp ...@@ -99,6 +104,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
registerApplicationMaster() registerApplicationMaster()
waitForSparkMaster() waitForSparkMaster()
addAmIpFilter()
// Allocate all containers // Allocate all containers
allocateExecutors() allocateExecutors()
...@@ -142,9 +148,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp ...@@ -142,9 +148,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
} }
private def registerApplicationMaster(): RegisterApplicationMasterResponse = { private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster") val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
// TODO: Find out client's Spark UI address and fill in here? logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")
amClient.registerApplicationMaster(Utils.localHostName(), 0, "") amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress)
}
// add the yarn amIpFilter that Yarn requires for properly securing the UI
private def addAmIpFilter() {
val proxy = WebAppUtils.getProxyHostAndPort(conf)
val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val uriBase = "http://" + proxy + proxyBase
val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase)
} }
private def waitForSparkMaster() { private def waitForSparkMaster() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment