From 46e224aaa26df4b232c5176e98472a902862b76c Mon Sep 17 00:00:00 2001 From: Rahul Singhal <rahul.singhal@guavus.com> Date: Thu, 24 Jul 2014 09:31:04 -0500 Subject: [PATCH] SPARK-2150: Provide direct link to finished application UI in yarn resou... ...rce manager UI Use the event logger directory to provide a direct link to finished application UI in yarn resourcemanager UI. Author: Rahul Singhal <rahul.singhal@guavus.com> Closes #1094 from rahulsinghaliitd/SPARK-2150 and squashes the following commits: 95f230c [Rahul Singhal] SPARK-2150: Provide direct link to finished application UI in yarn resource manager UI --- .../spark/deploy/history/FsHistoryProvider.scala | 3 ++- .../spark/deploy/history/HistoryPage.scala | 2 +- .../spark/deploy/history/HistoryServer.scala | 4 +++- .../org/apache/spark/deploy/master/Master.scala | 11 +++++++---- .../spark/scheduler/EventLoggingListener.scala | 7 +++++++ .../spark/deploy/yarn/ApplicationMaster.scala | 4 +++- .../spark/deploy/yarn/ExecutorLauncher.scala | 2 +- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 16 ++++++++++++++++ .../cluster/YarnClientSchedulerBackend.scala | 3 ++- .../spark/deploy/yarn/ApplicationMaster.scala | 5 +++-- .../spark/deploy/yarn/ExecutorLauncher.scala | 2 +- 11 files changed, 46 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a8c9ac0724..01e7065c17 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -169,7 +169,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val ui: SparkUI = if (renderUI) { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) - new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) + new SparkUI(conf, appSecManager, replayBus, appId, + HistoryServer.UI_PATH_PREFIX + s"/$appId") // Do not call ui.bind() to avoid creating a new server for each application } else { null diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index a958c837c2..d7a3e3f120 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -75,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { "Last Updated") private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val uiAddress = "/history/" + info.id + val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" val startTime = UIUtils.formatDate(info.startTime) val endTime = UIUtils.formatDate(info.endTime) val duration = UIUtils.formatDuration(info.endTime - info.startTime) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 56b38ddfc9..cacb9da8c9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -114,7 +114,7 @@ class HistoryServer( attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) val contextHandler = new ServletContextHandler - contextHandler.setContextPath("/history") + contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX) contextHandler.addServlet(new ServletHolder(loaderServlet), "/*") attachHandler(contextHandler) } @@ -172,6 +172,8 @@ class HistoryServer( object HistoryServer extends Logging { private val conf = new SparkConf + val UI_PATH_PREFIX = "/history" + def main(argStrings: Array[String]) { SignalLogger.register(log) initSecurity() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index bb1fcc8190..21f8667819 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -35,6 +35,7 @@ import akka.serialization.SerializationExtension import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ +import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI @@ -664,9 +665,10 @@ private[spark] class Master( */ def rebuildSparkUI(app: ApplicationInfo): Boolean = { val appName = app.desc.name + val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found" val eventLogDir = app.desc.eventLogDir.getOrElse { // Event logging is not enabled for this application - app.desc.appUiUrl = "/history/not-found" + app.desc.appUiUrl = notFoundBasePath return false } val fileSystem = Utils.getHadoopFileSystem(eventLogDir) @@ -681,13 +683,14 @@ private[spark] class Master( logWarning(msg) msg += " Did you specify the correct logging directory?" msg = URLEncoder.encode(msg, "UTF-8") - app.desc.appUiUrl = s"/history/not-found?msg=$msg&title=$title" + app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title" return false } try { val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) - val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id) + val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", + HistoryServer.UI_PATH_PREFIX + s"/${app.id}") replayBus.replay() appIdToUI(app.id) = ui webUi.attachSparkUI(ui) @@ -702,7 +705,7 @@ private[spark] class Master( var msg = s"Exception in replaying log for application $appName!" logError(msg, e) msg = URLEncoder.encode(msg, "UTF-8") - app.desc.appUiUrl = s"/history/not-found?msg=$msg&exception=$exception&title=$title" + app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title" false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a90b0d475c..ae6ca9f4e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -63,6 +63,13 @@ private[spark] class EventLoggingListener( // For testing. Keep track of all JSON serialized events that have been logged. private[scheduler] val loggedEvents = new ArrayBuffer[JValue] + /** + * Return only the unique application directory without the base directory. + */ + def getApplicationLogDir(): String = { + name + } + /** * Begin logging events. * If compression is used, log a file that indicates which compression library is used. diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 3ec36487dc..62b5c3bc5f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -60,6 +60,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var yarnAllocator: YarnAllocationHandler = _ private var isFinished: Boolean = false private var uiAddress: String = _ + private var uiHistoryAddress: String = _ private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true @@ -237,6 +238,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, if (null != sparkContext) { uiAddress = sparkContext.ui.appUIHostPort + uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf) this.yarnAllocator = YarnAllocationHandler.newAllocator( yarnConf, resourceManager, @@ -360,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) finishReq.setDiagnostics(diagnostics) - finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", "")) + finishReq.setTrackingUrl(uiHistoryAddress) resourceManager.finishApplicationMaster(finishReq) } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a86ad256df..d232c18d2f 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -289,7 +289,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) - finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", "")) + finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", "")) resourceManager.finishApplicationMaster(finishReq) } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 718cb19f57..e98308cdbd 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -30,6 +30,9 @@ import org.apache.hadoop.util.StringInterner import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.SparkHadoopUtil /** @@ -132,4 +135,17 @@ object YarnSparkHadoopUtil { } } + def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = { + val eventLogDir = sc.eventLogger match { + case Some(logger) => logger.getApplicationLogDir() + case None => "" + } + val historyServerAddress = conf.get("spark.yarn.historyServer.address", "") + if (historyServerAddress != "" && eventLogDir != "") { + historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir" + } else { + "" + } + } + } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index d8266f7b0c..77b91f8e26 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} -import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher} +import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl import scala.collection.mutable.ArrayBuffer @@ -54,6 +54,7 @@ private[spark] class YarnClientSchedulerBackend( val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort) + conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf)) val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index eaf594c8b4..035356d390 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -59,6 +59,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var yarnAllocator: YarnAllocationHandler = _ private var isFinished: Boolean = false private var uiAddress: String = _ + private var uiHistoryAddress: String = _ private val maxAppAttempts: Int = conf.getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) private var isLastAMRetry: Boolean = true @@ -216,6 +217,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, if (sparkContext != null) { uiAddress = sparkContext.ui.appUIHostPort + uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf) this.yarnAllocator = YarnAllocationHandler.newAllocator( yarnConf, amClient, @@ -312,8 +314,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, logInfo("Unregistering ApplicationMaster with " + status) if (registered) { - val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") - amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl) + amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) } } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 5ac95f3798..7158d9442a 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -250,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp def finishApplicationMaster(status: FinalApplicationStatus) { logInfo("Unregistering ApplicationMaster with " + status) - val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") + val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "") amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl) } -- GitLab