diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 310febc352ef84ec59ba418be4133739d15b3265..ff48b155fab316b2a1de34c97799b9a3845938be 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -213,17 +213,32 @@ public class LevelDB implements KVStore { @Override public void close() throws IOException { - DB _db = this._db.getAndSet(null); - if (_db == null) { - return; + synchronized (this._db) { + DB _db = this._db.getAndSet(null); + if (_db == null) { + return; + } + + try { + _db.close(); + } catch (IOException ioe) { + throw ioe; + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } } + } - try { - _db.close(); - } catch (IOException ioe) { - throw ioe; - } catch (Exception e) { - throw new IOException(e.getMessage(), e); + /** + * Closes the given iterator if the DB is still open. Trying to close a JNI LevelDB handle + * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. + */ + void closeIterator(LevelDBIterator it) throws IOException { + synchronized (this._db) { + DB _db = this._db.get(); + if (_db != null) { + it.close(); + } } } diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java index a2181f3874f86df260eaad0251d09b228ca29720..b3ba76ba58052bf2ce6562e07cf1abe999d232b9 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java @@ -191,6 +191,16 @@ class LevelDBIterator<T> implements KVStoreIterator<T> { } } + /** + * Because it's tricky to expose closeable iterators through many internal APIs, especially + * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by + * the iterator will eventually be released. + */ + @Override + protected void finalize() throws Throwable { + db.closeIterator(this); + } + private byte[] loadNext() { if (count >= max) { return null; diff --git a/core/pom.xml b/core/pom.xml index da68abd855c7ca82675e80e0de441eee497ccd91..09669149d8123e2b29ace23f75ef557d437e2a29 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -67,6 +67,11 @@ <artifactId>spark-launcher_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-kvstore_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-network-common_${scala.binary.version}</artifactId> diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 910121e9878b98a6049b45dde1c26d807d049b66..3889dd097ee598bf163a0ee9fe27155655546d61 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,14 +17,17 @@ package org.apache.spark.deploy.history -import java.io.{FileNotFoundException, IOException, OutputStream} -import java.util.UUID -import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService, Future, TimeUnit} +import java.io.{File, FileNotFoundException, IOException} +import java.util.{Date, UUID} +import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.xml.Node +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude} +import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, Path} @@ -35,11 +38,14 @@ import org.apache.hadoop.security.AccessControlException import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ /** * A class that provides application history from event logs stored in the file system. @@ -50,11 +56,10 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} * * - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any * entries in the log dir whose modification time is greater than the last scan time - * are considered new or updated. These are replayed to create a new [[FsApplicationAttemptInfo]] - * entry and update or create a matching [[FsApplicationHistoryInfo]] element in the list - * of applications. + * are considered new or updated. These are replayed to create a new attempt info entry + * and update or create a matching application info element in the list of applications. * - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the - * [[FsApplicationAttemptInfo]] is replaced by another one with a larger log size. + * attempt is replaced by another one with a larger log size. * - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]] * instance is out of date, the log size of the cached instance is checked against the app last * loaded by [[checkForLogs]]. @@ -78,6 +83,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) this(conf, new SystemClock()) } + import config._ import FsHistoryProvider._ // Interval between safemode checks. @@ -94,8 +100,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS, Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) - private val logDir = conf.getOption("spark.history.fs.logDirectory") - .getOrElse(DEFAULT_LOG_DIR) + private val logDir = conf.get(EVENT_LOG_DIR) private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false) private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "") @@ -117,17 +122,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // used for logging msgs (logs are re-scanned based on file size, rather than modtime) private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1) - // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted - // into the map in order, so the LinkedHashMap maintains the correct ordering. - @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] - = new mutable.LinkedHashMap() + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]() + private val storePath = conf.get(LOCAL_STORE_DIR) - // List of application logs to be deleted by event log cleaner. - private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + // Visible for testing. + private[history] val listing: KVStore = storePath.map { path => + val dbPath = new File(path, "listing.ldb") - private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) + def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) + + try { + val db = openDB() + val meta = db.getMetadata(classOf[KVStoreMetadata]) + + if (meta == null) { + db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, logDir)) + db + } else if (meta.version != CURRENT_LISTING_VERSION || !logDir.equals(meta.logDir)) { + logInfo("Detected mismatched config in existing DB, deleting...") + db.close() + Utils.deleteRecursively(dbPath) + openDB() + } else { + db + } + } catch { + case _: UnsupportedStoreVersionException => + logInfo("Detected incompatible DB versions, deleting...") + Utils.deleteRecursively(dbPath) + openDB() + } + }.getOrElse(new InMemoryStore()) /** * Return a runnable that performs the given operation on the event logs. @@ -231,10 +257,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator + override def getListing(): Iterator[ApplicationHistoryInfo] = { + // Return the listing in end time descending order. + listing.view(classOf[ApplicationInfoWrapper]) + .index("endTime") + .reverse() + .iterator() + .asScala + .map(_.toAppHistoryInfo()) + } - override def getApplicationInfo(appId: String): Option[FsApplicationHistoryInfo] = { - applications.get(appId) + override def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo] = { + try { + Some(load(appId).toAppHistoryInfo()) + } catch { + case e: NoSuchElementException => + None + } } override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get() @@ -243,42 +282,40 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { - applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => + val appInfo = load(appId) + appInfo.attempts + .find(_.info.attemptId == attemptId) + .map { attempt => val replayBus = new ReplayListenerBus() val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, - HistoryServer.getAttemptURI(appId, attempt.attemptId), - Some(attempt.lastUpdated), attempt.startTime) + SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + Some(attempt.info.lastUpdated.getTime()), attempt.info.startTime.getTime()) // Do not call ui.bind() to avoid creating a new server for each application } val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - - if (appListener.appId.isDefined) { - ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") - ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) - // make sure to set admin acls before view acls so they are properly picked up - val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") - ui.getSecurityManager.setAdminAcls(adminAcls) - ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) - val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") - ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize))) - } else { - None - } - + assert(appListener.appId.isDefined) + ui.appSparkVersion = appListener.appSparkVersion.getOrElse("") + ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) + // make sure to set admin acls before view acls so they are properly picked up + val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") + ui.getSecurityManager.setAdminAcls(adminAcls) + ui.getSecurityManager.setViewAcls(attempt.info.sparkUser, + appListener.viewAcls.getOrElse("")) + val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + + appListener.adminAclsGroups.getOrElse("") + ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) + ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) + LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize)) } - } } catch { - case e: FileNotFoundException => None + case _: FileNotFoundException => None + case _: NoSuchElementException => None } } @@ -303,9 +340,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } override def stop(): Unit = { - if (initThread != null && initThread.isAlive()) { - initThread.interrupt() - initThread.join() + try { + if (initThread != null && initThread.isAlive()) { + initThread.interrupt() + initThread.join() + } + } finally { + listing.close() } } @@ -318,25 +359,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) - .getOrElse(Seq.empty[FileStatus]) // scan for modified applications, replay and merge them - val logInfos: Seq[FileStatus] = statusList + val logInfos = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) .filter { entry => - val fileInfo = fileToAppInfo.get(entry.getPath()) - val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 0L !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && - prevFileSize < entry.getLen() && - SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) + SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) && + recordedFileSize(entry.getPath()) < entry.getLen() } - .flatMap { entry => Some(entry) } .sortWith { case (entry1, entry2) => - entry1.getModificationTime() >= entry2.getModificationTime() - } + entry1.getModificationTime() > entry2.getModificationTime() + } if (logInfos.nonEmpty) { logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") @@ -424,207 +460,104 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - applications.get(appId) match { - case Some(appInfo) => - try { - // If no attempt is specified, or there is no attemptId for attempts, return all attempts - appInfo.attempts.filter { attempt => - attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get - }.foreach { attempt => - val logPath = new Path(logDir, attempt.logPath) - zipFileToStream(logPath, attempt.logPath, zipStream) - } - } finally { - zipStream.close() + val app = try { + load(appId) + } catch { + case _: NoSuchElementException => + throw new SparkException(s"Logs for $appId not found.") + } + + try { + // If no attempt is specified, or there is no attemptId for attempts, return all attempts + attemptId + .map { id => app.attempts.filter(_.info.attemptId == Some(id)) } + .getOrElse(app.attempts) + .map(_.logPath) + .foreach { log => + zipFileToStream(new Path(logDir, log), log, zipStream) } - case None => throw new SparkException(s"Logs for $appId not found.") + } finally { + zipStream.close() } } /** - * Replay the log files in the list and merge the list of old applications with new ones + * Replay the given log file, saving the application in the listing db. */ protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { - val newAttempts = try { - val eventsFilter: ReplayEventsFilter = { eventString => - eventString.startsWith(APPL_START_EVENT_PREFIX) || - eventString.startsWith(APPL_END_EVENT_PREFIX) || - eventString.startsWith(LOG_START_EVENT_PREFIX) - } - - val logPath = fileStatus.getPath() - val appCompleted = isApplicationCompleted(fileStatus) - - // Use loading time as lastUpdated since some filesystems don't update modifiedTime - // each time file is updated. However use modifiedTime for completed jobs so lastUpdated - // won't change whenever HistoryServer restarts and reloads the file. - val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis() - - val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter) - - // Without an app ID, new logs will render incorrectly in the listing page, so do not list or - // try to show their UI. - if (appListener.appId.isDefined) { - val attemptInfo = new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appListener.appId.getOrElse(logPath.getName()), - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - lastUpdated, - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted, - fileStatus.getLen(), - appListener.appSparkVersion.getOrElse("") - ) - fileToAppInfo.put(logPath, attemptInfo) - logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo") - Some(attemptInfo) - } else { - logWarning(s"Failed to load application log ${fileStatus.getPath}. " + - "The application may have not started.") - None - } - - } catch { - case e: Exception => - logError( - s"Exception encountered when attempting to load application log ${fileStatus.getPath}", - e) - None - } - - if (newAttempts.isEmpty) { - return + val eventsFilter: ReplayEventsFilter = { eventString => + eventString.startsWith(APPL_START_EVENT_PREFIX) || + eventString.startsWith(APPL_END_EVENT_PREFIX) || + eventString.startsWith(LOG_START_EVENT_PREFIX) } - // Build a map containing all apps that contain new attempts. The app information in this map - // contains both the new app attempt, and those that were already loaded in the existing apps - // map. If an attempt has been updated, it replaces the old attempt in the list. - val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]() - - applications.synchronized { - newAttempts.foreach { attempt => - val appInfo = newAppMap.get(attempt.appId) - .orElse(applications.get(attempt.appId)) - .map { app => - val attempts = - app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt) - new FsApplicationHistoryInfo(attempt.appId, attempt.name, - attempts.sortWith(compareAttemptInfo)) - } - .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt))) - newAppMap(attempt.appId) = appInfo - } + val logPath = fileStatus.getPath() + logInfo(s"Replaying log path: $logPath") - // Merge the new app list with the existing one, maintaining the expected ordering (descending - // end time). Maintaining the order is important to avoid having to sort the list every time - // there is a request for the log list. - val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo) - val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - def addIfAbsent(info: FsApplicationHistoryInfo): Unit = { - if (!mergedApps.contains(info.id)) { - mergedApps += (info.id -> info) - } - } + val bus = new ReplayListenerBus() + val listener = new AppListingListener(fileStatus, clock) + bus.addListener(listener) - val newIterator = newApps.iterator.buffered - val oldIterator = applications.values.iterator.buffered - while (newIterator.hasNext && oldIterator.hasNext) { - if (newAppMap.contains(oldIterator.head.id)) { - oldIterator.next() - } else if (compareAppInfo(newIterator.head, oldIterator.head)) { - addIfAbsent(newIterator.next()) - } else { - addIfAbsent(oldIterator.next()) - } - } - newIterator.foreach(addIfAbsent) - oldIterator.foreach(addIfAbsent) - - applications = mergedApps - } + replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter) + listener.applicationInfo.foreach(addListing) + listing.write(LogInfo(logPath.toString(), fileStatus.getLen())) } /** * Delete event logs from the log directory according to the clean policy defined by the user. */ private[history] def cleanLogs(): Unit = { + var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None try { - val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000 - - val now = clock.getTimeMillis() - val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - - def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = { - now - attempt.lastUpdated > maxAge - } + val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 + + // Iterate descending over all applications whose oldest attempt happened before maxTime. + iterator = Some(listing.view(classOf[ApplicationInfoWrapper]) + .index("oldestAttempt") + .reverse() + .first(maxTime) + .closeableIterator()) + + iterator.get.asScala.foreach { app => + // Applications may have multiple attempts, some of which may not need to be deleted yet. + val (remaining, toDelete) = app.attempts.partition { attempt => + attempt.info.lastUpdated.getTime() >= maxTime + } - // Scan all logs from the log directory. - // Only completed applications older than the specified max age will be deleted. - applications.values.foreach { app => - val (toClean, toRetain) = app.attempts.partition(shouldClean) - attemptsToClean ++= toClean - - if (toClean.isEmpty) { - appsToRetain += (app.id -> app) - } else if (toRetain.nonEmpty) { - appsToRetain += (app.id -> - new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList)) + if (remaining.nonEmpty) { + val newApp = new ApplicationInfoWrapper(app.info, remaining) + listing.write(newApp) } - } - applications = appsToRetain + toDelete.foreach { attempt => + val logPath = new Path(logDir, attempt.logPath) + try { + listing.delete(classOf[LogInfo], logPath.toString()) + } catch { + case _: NoSuchElementException => + logDebug(s"Log info entry for $logPath not found.") + } + try { + fs.delete(logPath, true) + } catch { + case e: AccessControlException => + logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") + case t: IOException => + logError(s"IOException in cleaning ${attempt.logPath}", t) + } + } - val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] - attemptsToClean.foreach { attempt => - try { - fs.delete(new Path(logDir, attempt.logPath), true) - } catch { - case e: AccessControlException => - logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") - case t: IOException => - logError(s"IOException in cleaning ${attempt.logPath}", t) - leftToClean += attempt + if (remaining.isEmpty) { + listing.delete(app.getClass(), app.id) } } - - attemptsToClean = leftToClean } catch { - case t: Exception => logError("Exception in cleaning logs", t) + case t: Exception => logError("Exception while cleaning logs", t) + } finally { + iterator.foreach(_.close()) } } - /** - * Comparison function that defines the sort order for the application listing. - * - * @return Whether `i1` should precede `i2`. - */ - private def compareAppInfo( - i1: FsApplicationHistoryInfo, - i2: FsApplicationHistoryInfo): Boolean = { - val a1 = i1.attempts.head - val a2 = i2.attempts.head - if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime - } - - /** - * Comparison function that defines the sort order for application attempts within the same - * application. Order is: attempts are sorted by descending start time. - * Most recent attempt state matches with current state of the app. - * - * Normally applications should have a single running attempt; but failure to call sc.stop() - * may cause multiple running attempts to show up. - * - * @return Whether `a1` should precede `a2`. - */ - private def compareAttemptInfo( - a1: FsApplicationAttemptInfo, - a2: FsApplicationAttemptInfo): Boolean = { - a1.startTime >= a2.startTime - } - /** * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns * an `ApplicationEventListener` instance with event data captured from the replay. @@ -649,6 +582,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appListener = new ApplicationEventListener bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) + logInfo(s"Finished replaying $logPath") appListener } finally { logInput.close() @@ -685,26 +619,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * @return a summary of the component state */ override def toString: String = { - val header = s""" - | FsHistoryProvider: logdir=$logDir, - | last scan time=$lastScanTime - | Cached application count =${applications.size}} - """.stripMargin - val sb = new StringBuilder(header) - applications.foreach(entry => sb.append(entry._2).append("\n")) - sb.toString - } - - /** - * Look up an application attempt - * @param appId application ID - * @param attemptId Attempt ID, if set - * @return the matching attempt, if found - */ - def lookup(appId: String, attemptId: Option[String]): Option[FsApplicationAttemptInfo] = { - applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId) - } + val count = listing.count(classOf[ApplicationInfoWrapper]) + s"""|FsHistoryProvider{logdir=$logDir, + | storedir=$storePath, + | last scan time=$lastScanTime + | application count=$count}""".stripMargin } /** @@ -722,21 +641,69 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appId: String, attemptId: Option[String], prevFileSize: Long)(): Boolean = { - lookup(appId, attemptId) match { - case None => + try { + val attempt = getAttempt(appId, attemptId) + val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) + recordedFileSize(logPath) > prevFileSize + } catch { + case _: NoSuchElementException => logDebug(s"Application Attempt $appId/$attemptId not found") false - case Some(latest) => - prevFileSize < latest.fileSize } } -} -private[history] object FsHistoryProvider { - val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + /** + * Return the last known size of the given event log, recorded the last time the file + * system scanner detected a change in the file. + */ + private def recordedFileSize(log: Path): Long = { + try { + listing.read(classOf[LogInfo], log.toString()).fileSize + } catch { + case _: NoSuchElementException => 0L + } + } + + private def load(appId: String): ApplicationInfoWrapper = { + listing.read(classOf[ApplicationInfoWrapper], appId) + } + + /** + * Write the app's information to the given store. Serialized to avoid the (notedly rare) case + * where two threads are processing separate attempts of the same application. + */ + private def addListing(app: ApplicationInfoWrapper): Unit = listing.synchronized { + val attempt = app.attempts.head + + val oldApp = try { + load(app.id) + } catch { + case _: NoSuchElementException => + app + } + + def compareAttemptInfo(a1: AttemptInfoWrapper, a2: AttemptInfoWrapper): Boolean = { + a1.info.startTime.getTime() > a2.info.startTime.getTime() + } + + val attempts = oldApp.attempts.filter(_.info.attemptId != attempt.info.attemptId) ++ + List(attempt) + + val newAppInfo = new ApplicationInfoWrapper( + app.info, + attempts.sortWith(compareAttemptInfo)) + listing.write(newAppInfo) + } - private val NOT_STARTED = "<Not Started>" + /** For testing. Returns internal data about a single attempt. */ + private[history] def getAttempt(appId: String, attemptId: Option[String]): AttemptInfoWrapper = { + load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( + throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId.")) + } +} + +private[history] object FsHistoryProvider { private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads" private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\"" @@ -744,53 +711,145 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + /** + * Current version of the data written to the listing database. When opening an existing + * db, if the version does not match this value, the FsHistoryProvider will throw away + * all data and re-generate the listing data from the event logs. + */ + private[history] val CURRENT_LISTING_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - * the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) + +} + +private[history] case class KVStoreMetadata( + version: Long, + logDir: String) + +private[history] case class LogInfo( + @KVIndexParam logPath: String, + fileSize: Long) + +private[history] class AttemptInfoWrapper( + val info: v1.ApplicationAttemptInfo, val logPath: String, - val name: String, - val appId: String, - attemptId: Option[String], - startTime: Long, - endTime: Long, - lastUpdated: Long, - sparkUser: String, - completed: Boolean, - val fileSize: Long, - appSparkVersion: String) - extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed, appSparkVersion) { - - /** extend the superclass string value with the extra attributes of this class */ - override def toString: String = { - s"FsApplicationAttemptInfo($name, $appId," + - s" ${super.toString}, source=$logPath, size=$fileSize" + val fileSize: Long) { + + def toAppAttemptInfo(): ApplicationAttemptInfo = { + ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), + info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser, + info.completed, info.appSparkVersion) } + } -/** - * Application history information - * @param id application ID - * @param name application name - * @param attempts list of attempts, most recent first. - */ -private class FsApplicationHistoryInfo( - id: String, - override val name: String, - override val attempts: List[FsApplicationAttemptInfo]) - extends ApplicationHistoryInfo(id, name, attempts) +private[history] class ApplicationInfoWrapper( + val info: v1.ApplicationInfo, + val attempts: List[AttemptInfoWrapper]) { + + @JsonIgnore @KVIndexParam + def id: String = info.id + + @JsonIgnore @KVIndexParam("endTime") + def endTime(): Long = attempts.head.info.endTime.getTime() + + @JsonIgnore @KVIndexParam("oldestAttempt") + def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min + + def toAppHistoryInfo(): ApplicationHistoryInfo = { + ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo())) + } + +} + +private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener { + + private val app = new MutableApplicationInfo() + private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen()) + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { + app.id = event.appId.orNull + app.name = event.appName + + attempt.attemptId = event.appAttemptId + attempt.startTime = new Date(event.time) + attempt.lastUpdated = new Date(clock.getTimeMillis()) + attempt.sparkUser = event.sparkUser + } + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { + attempt.endTime = new Date(event.time) + attempt.lastUpdated = new Date(log.getModificationTime()) + attempt.duration = event.time - attempt.startTime.getTime() + attempt.completed = true + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case SparkListenerLogStart(sparkVersion) => + attempt.appSparkVersion = sparkVersion + case _ => + } + + def applicationInfo: Option[ApplicationInfoWrapper] = { + if (app.id != null) { + Some(app.toView()) + } else { + None + } + } + + private class MutableApplicationInfo { + var id: String = null + var name: String = null + var coresGranted: Option[Int] = None + var maxCores: Option[Int] = None + var coresPerExecutor: Option[Int] = None + var memoryPerExecutorMB: Option[Int] = None + + def toView(): ApplicationInfoWrapper = { + val apiInfo = new v1.ApplicationInfo(id, name, coresGranted, maxCores, coresPerExecutor, + memoryPerExecutorMB, Nil) + new ApplicationInfoWrapper(apiInfo, List(attempt.toView())) + } + + } + + private class MutableAttemptInfo(logPath: String, fileSize: Long) { + var attemptId: Option[String] = None + var startTime = new Date(-1) + var endTime = new Date(-1) + var lastUpdated = new Date(-1) + var duration = 0L + var sparkUser: String = null + var completed = false + var appSparkVersion = "" + + def toView(): AttemptInfoWrapper = { + val apiInfo = new v1.ApplicationAttemptInfo( + attemptId, + startTime, + endTime, + lastUpdated, + duration, + sparkUser, + completed, + appSparkVersion) + new AttemptInfoWrapper( + apiInfo, + logPath, + fileSize) + } + + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala new file mode 100644 index 0000000000000000000000000000000000000000..fb9e997def0ddeaa5e88f476014bcfac8bc0db17 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.TimeUnit + +import scala.annotation.meta.getter + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.util.kvstore.KVIndex + +private[spark] object config { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") + .stringConf + .createWithDefault(DEFAULT_LOG_DIR) + + val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("7d") + + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") + .doc("Local directory where to cache application history information. By default this is " + + "not set, meaning all history information will be kept in memory.") + .stringConf + .createOptional + +} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 05948f2661056ae52714aedffcc29b612b71c4e4..31659b25db318832e6587e57ee19de98c8268a13 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -20,6 +20,8 @@ import java.util.Date import scala.collection.Map +import com.fasterxml.jackson.annotation.JsonIgnoreProperties + import org.apache.spark.JobExecutionStatus class ApplicationInfo private[spark]( @@ -31,6 +33,9 @@ class ApplicationInfo private[spark]( val memoryPerExecutorMB: Option[Int], val attempts: Seq[ApplicationAttemptInfo]) +@JsonIgnoreProperties( + value = Array("startTimeEpoch", "endTimeEpoch", "lastUpdatedEpoch"), + allowGetters = true) class ApplicationAttemptInfo private[spark]( val attemptId: Option[String], val startTime: Date, @@ -40,9 +45,13 @@ class ApplicationAttemptInfo private[spark]( val sparkUser: String, val completed: Boolean = false, val appSparkVersion: String) { - def getStartTimeEpoch: Long = startTime.getTime - def getEndTimeEpoch: Long = endTime.getTime - def getLastUpdatedEpoch: Long = lastUpdated.getTime + + def getStartTimeEpoch: Long = startTime.getTime + + def getEndTimeEpoch: Long = endTime.getTime + + def getLastUpdatedEpoch: Long = lastUpdated.getTime + } class ExecutorStageSummary private[spark]( diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 7109146ece371356739ead512e3a76035d0c1422..2141934c9264076cb1ba6e66b2ad856a311c499f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -36,6 +36,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.io._ import org.apache.spark.scheduler._ @@ -66,9 +67,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc new File(logPath) } - test("Parse application logs") { + Seq(true, false).foreach { inMemory => + test(s"Parse application logs (inMemory = $inMemory)") { + testAppLogParsing(inMemory) + } + } + + private def testAppLogParsing(inMemory: Boolean) { val clock = new ManualClock(12345678) - val provider = new FsHistoryProvider(createTestConf(), clock) + val provider = new FsHistoryProvider(createTestConf(inMemory = inMemory), clock) // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) @@ -172,20 +179,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ) updateAndCheck(provider) { list => list.size should be (1) - list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should - endWith(EventLoggingListener.IN_PROGRESS) + provider.getAttempt("app1", None).logPath should endWith(EventLoggingListener.IN_PROGRESS) } logFile1.renameTo(newLogFile("app1", None, inProgress = false)) updateAndCheck(provider) { list => list.size should be (1) - list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not - endWith(EventLoggingListener.IN_PROGRESS) + provider.getAttempt("app1", None).logPath should not endWith(EventLoggingListener.IN_PROGRESS) } } test("Parse logs that application is not started") { - val provider = new FsHistoryProvider((createTestConf())) + val provider = new FsHistoryProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -342,17 +347,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.checkForLogs() // This should not trigger any cleanup - updateAndCheck(provider)(list => list.size should be(2)) + updateAndCheck(provider) { list => + list.size should be(2) + } // Should trigger cleanup for first file but not second one clock.setTime(firstFileModifiedTime + maxAge + 1) - updateAndCheck(provider)(list => list.size should be(1)) + updateAndCheck(provider) { list => + list.size should be(1) + } assert(!log1.exists()) assert(log2.exists()) // Should cleanup the second file as well. clock.setTime(secondFileModifiedTime + maxAge + 1) - updateAndCheck(provider)(list => list.size should be(0)) + updateAndCheck(provider) { list => + list.size should be(0) + } assert(!log1.exists()) assert(!log2.exists()) } @@ -580,7 +591,34 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc securityManager.checkUIViewPermissions("user4") should be (false) securityManager.checkUIViewPermissions("user5") should be (false) } - } + } + + test("mismatched version discards old listing") { + val conf = createTestConf() + val oldProvider = new FsHistoryProvider(conf) + + val logFile1 = newLogFile("app1", None, inProgress = false) + writeFile(logFile1, true, None, + SparkListenerLogStart("2.3"), + SparkListenerApplicationStart("test", Some("test"), 1L, "test", None), + SparkListenerApplicationEnd(5L) + ) + + updateAndCheck(oldProvider) { list => + list.size should be (1) + } + assert(oldProvider.listing.count(classOf[ApplicationInfoWrapper]) === 1) + + // Manually overwrite the version in the listing db; this should cause the new provider to + // discard all data because the versions don't match. + val meta = new KVStoreMetadata(FsHistoryProvider.CURRENT_LISTING_VERSION + 1, + conf.get(LOCAL_STORE_DIR).get) + oldProvider.listing.setMetadata(meta) + oldProvider.stop() + + val mistatchedVersionProvider = new FsHistoryProvider(conf) + assert(mistatchedVersionProvider.listing.count(classOf[ApplicationInfoWrapper]) === 0) + } /** * Asks the provider to check for logs and calls a function to perform checks on the updated @@ -623,8 +661,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc new FileOutputStream(file).close() } - private def createTestConf(): SparkConf = { - new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + private def createTestConf(inMemory: Boolean = false): SparkConf = { + val conf = new SparkConf() + .set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + + if (!inMemory) { + conf.set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath()) + } + + conf } private class SafeModeTestProvider(conf: SparkConf, clock: Clock) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 18da8c18939ed49a1cb203c799c2f75d42aa31df..c11543a4b3ba283f80e054d537449130ed8d4e40 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -43,6 +43,7 @@ import org.scalatest.mockito.MockitoSugar import org.scalatest.selenium.WebBrowser import org.apache.spark._ +import org.apache.spark.deploy.history.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData.JobUIData import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -64,6 +65,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers private val logDir = getTestResourcePath("spark-events") private val expRoot = getTestResourceFile("HistoryServerExpectations") + private val storeDir = Utils.createTempDir(namePrefix = "history") private var provider: FsHistoryProvider = null private var server: HistoryServer = null @@ -74,6 +76,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") .set("spark.testing", "true") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) conf.setAll(extraConf) provider = new FsHistoryProvider(conf) provider.checkForLogs() @@ -87,14 +90,13 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers def stop(): Unit = { server.stop() + server = null } before { - init() - } - - after{ - stop() + if (server == null) { + init() + } } val cases = Seq( @@ -296,6 +298,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") .set("spark.testing", "true") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) provider = new FsHistoryProvider(conf) provider.checkForLogs() @@ -372,6 +375,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } test("incomplete apps get refreshed") { + server.stop() implicit val webDriver: WebDriver = new HtmlUnitDriver implicit val formats = org.json4s.DefaultFormats @@ -388,6 +392,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set("spark.history.fs.update.interval", "1s") .set("spark.eventLog.enabled", "true") .set("spark.history.cache.window", "250ms") + .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) .remove("spark.testing") val provider = new FsHistoryProvider(myConf) val securityManager = HistoryServer.createSecurityManager(myConf) @@ -413,8 +418,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } - // stop the server with the old config, and start the new one - server.stop() server = new HistoryServer(myConf, provider, securityManager, 18080) server.initialize() server.bind() diff --git a/docs/monitoring.md b/docs/monitoring.md index 51084a25983eac61c118f39f5c3b7bf47c6f8f24..1ae43185d22f8cdf55e0139f8b94634cea7ed3f5 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -224,6 +224,15 @@ The history server can be configured as follows: Number of threads that will be used by history server to process event logs. </td> </tr> + <tr> + <td>spark.history.store.path</td> + <td>(none)</td> + <td> + Local directory where to cache application history data. If set, the history + server will store application data on disk instead of keeping it in memory. The data + written to disk will be re-used in the event of a history server restart. + </td> + </tr> </table> Note that in all of these UIs, the tables are sortable by clicking their headers, diff --git a/scalastyle-config.xml b/scalastyle-config.xml index bd7f462b722cd9c032a3971e591480aee02f863f..7bdd3fac773a3dc6c2eaf05bb49cce696c5b48c4 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -86,7 +86,7 @@ This file is divided into 3 sections: </check> <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true"> - <parameters><parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter></parameters> + <parameters><parameter name="regex"><![CDATA[(config|[A-Z][A-Za-z]*)]]></parameter></parameters> </check> <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">