diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a8c9ac072449fe6bb6570833bc42b101b3ad590b..01e7065c17b691eb07920ab96093985bf418f097 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -169,7 +169,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val ui: SparkUI = if (renderUI) { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) - new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) + new SparkUI(conf, appSecManager, replayBus, appId, + HistoryServer.UI_PATH_PREFIX + s"/$appId") // Do not call ui.bind() to avoid creating a new server for each application } else { null diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index a958c837c2ff66af91a36853c53dd2881cce499d..d7a3e3f120e67555dee134e019200b79e1ebdc80 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -75,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { "Last Updated") private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val uiAddress = "/history/" + info.id + val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" val startTime = UIUtils.formatDate(info.startTime) val endTime = UIUtils.formatDate(info.endTime) val duration = UIUtils.formatDuration(info.endTime - info.startTime) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 56b38ddfc93138830d06d9ff29e79a56144125ce..cacb9da8c947b106edbc767bcdbd884c4e3ae44c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -114,7 +114,7 @@ class HistoryServer( attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) val contextHandler = new ServletContextHandler - contextHandler.setContextPath("/history") + contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX) contextHandler.addServlet(new ServletHolder(loaderServlet), "/*") attachHandler(contextHandler) } @@ -172,6 +172,8 @@ class HistoryServer( object HistoryServer extends Logging { private val conf = new SparkConf + val UI_PATH_PREFIX = "/history" + def main(argStrings: Array[String]) { SignalLogger.register(log) initSecurity() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index bb1fcc8190fe469b3b3e02c91e11b982fffb95a5..21f8667819c44b7b4a54af4e825ef8bb8cbceab5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -35,6 +35,7 @@ import akka.serialization.SerializationExtension import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ +import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI @@ -664,9 +665,10 @@ private[spark] class Master( */ def rebuildSparkUI(app: ApplicationInfo): Boolean = { val appName = app.desc.name + val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found" val eventLogDir = app.desc.eventLogDir.getOrElse { // Event logging is not enabled for this application - app.desc.appUiUrl = "/history/not-found" + app.desc.appUiUrl = notFoundBasePath return false } val fileSystem = Utils.getHadoopFileSystem(eventLogDir) @@ -681,13 +683,14 @@ private[spark] class Master( logWarning(msg) msg += " Did you specify the correct logging directory?" msg = URLEncoder.encode(msg, "UTF-8") - app.desc.appUiUrl = s"/history/not-found?msg=$msg&title=$title" + app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title" return false } try { val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) - val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id) + val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", + HistoryServer.UI_PATH_PREFIX + s"/${app.id}") replayBus.replay() appIdToUI(app.id) = ui webUi.attachSparkUI(ui) @@ -702,7 +705,7 @@ private[spark] class Master( var msg = s"Exception in replaying log for application $appName!" logError(msg, e) msg = URLEncoder.encode(msg, "UTF-8") - app.desc.appUiUrl = s"/history/not-found?msg=$msg&exception=$exception&title=$title" + app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title" false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a90b0d475c04ec769b3b6e17c0730cc3079ce9a2..ae6ca9f4e7bf5054f2556da98938919da68ee4cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -63,6 +63,13 @@ private[spark] class EventLoggingListener( // For testing. Keep track of all JSON serialized events that have been logged. private[scheduler] val loggedEvents = new ArrayBuffer[JValue] + /** + * Return only the unique application directory without the base directory. + */ + def getApplicationLogDir(): String = { + name + } + /** * Begin logging events. * If compression is used, log a file that indicates which compression library is used. diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 3ec36487dcd2678cbba456f14dcf26a904f7ff5b..62b5c3bc5f0f3a93461c030b6e3c034654d54c68 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -60,6 +60,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var yarnAllocator: YarnAllocationHandler = _ private var isFinished: Boolean = false private var uiAddress: String = _ + private var uiHistoryAddress: String = _ private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true @@ -237,6 +238,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, if (null != sparkContext) { uiAddress = sparkContext.ui.appUIHostPort + uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf) this.yarnAllocator = YarnAllocationHandler.newAllocator( yarnConf, resourceManager, @@ -360,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) finishReq.setDiagnostics(diagnostics) - finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", "")) + finishReq.setTrackingUrl(uiHistoryAddress) resourceManager.finishApplicationMaster(finishReq) } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a86ad256dfa39ca5d8578eb10b64f3dfa0ea8a3c..d232c18d2f5a4988f85b0322f4c18afd248e54f6 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -289,7 +289,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) - finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", "")) + finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", "")) resourceManager.finishApplicationMaster(finishReq) } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 718cb19f57261018625735e51bb383caf98188cb..e98308cdbd74e869fcc7e309c582d91e34a5ef09 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -30,6 +30,9 @@ import org.apache.hadoop.util.StringInterner import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.SparkHadoopUtil /** @@ -132,4 +135,17 @@ object YarnSparkHadoopUtil { } } + def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = { + val eventLogDir = sc.eventLogger match { + case Some(logger) => logger.getApplicationLogDir() + case None => "" + } + val historyServerAddress = conf.get("spark.yarn.historyServer.address", "") + if (historyServerAddress != "" && eventLogDir != "") { + historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir" + } else { + "" + } + } + } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index d8266f7b0c9a711dca497370186bd719d904b513..77b91f8e260fe82969ff43fad3c007469772528a 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} -import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher} +import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl import scala.collection.mutable.ArrayBuffer @@ -54,6 +54,7 @@ private[spark] class YarnClientSchedulerBackend( val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort) + conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf)) val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index eaf594c8b49b98a77fc11d41baef92f778dfbb62..035356d390c80b909920b4de20fdb15c15c4f639 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -59,6 +59,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private var yarnAllocator: YarnAllocationHandler = _ private var isFinished: Boolean = false private var uiAddress: String = _ + private var uiHistoryAddress: String = _ private val maxAppAttempts: Int = conf.getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) private var isLastAMRetry: Boolean = true @@ -216,6 +217,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, if (sparkContext != null) { uiAddress = sparkContext.ui.appUIHostPort + uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf) this.yarnAllocator = YarnAllocationHandler.newAllocator( yarnConf, amClient, @@ -312,8 +314,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, logInfo("Unregistering ApplicationMaster with " + status) if (registered) { - val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") - amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl) + amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) } } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 5ac95f379872331da1f64fadfcb2960aa820fb1b..7158d9442a459901e9bcb3118f482963f9265b07 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -250,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp def finishApplicationMaster(status: FinalApplicationStatus) { logInfo("Unregistering ApplicationMaster with " + status) - val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") + val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "") amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl) }