Skip to content
Snippets Groups Projects
Commit ed8869eb authored by Ergin Seyfe's avatar Ergin Seyfe Committed by Marcelo Vanzin
Browse files

[SPARK-8617][WEBUI] HistoryServer: Include in-progress files during cleanup

## What changes were proposed in this pull request?
- Removed the`attempt.completed ` filter so cleaner would include the orphan inprogress files.
- Use loading time for inprogress files as lastUpdated. Keep using the modTime for completed files. First one will prevent deletion of inprogress job files. Second one will ensure that lastUpdated time won't change for completed jobs in an event of HistoryServer reboot.

## How was this patch tested?
Added new unittests and via existing tests.

Author: Ergin Seyfe <eseyfe@fb.com>

Closes #16165 from seyfe/clear_old_inprogress_files.
parent b44d1b8f
No related branches found
No related tags found
No related merge requests found
...@@ -446,9 +446,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ...@@ -446,9 +446,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} }
val logPath = fileStatus.getPath() val logPath = fileStatus.getPath()
val appCompleted = isApplicationCompleted(fileStatus) val appCompleted = isApplicationCompleted(fileStatus)
// Use loading time as lastUpdated since some filesystems don't update modifiedTime
// each time file is updated. However use modifiedTime for completed jobs so lastUpdated
// won't change whenever HistoryServer restarts and reloads the file.
val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis()
val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter) val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter)
// Without an app ID, new logs will render incorrectly in the listing page, so do not list or // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
...@@ -461,7 +465,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ...@@ -461,7 +465,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
appListener.appAttemptId, appListener.appAttemptId,
appListener.startTime.getOrElse(-1L), appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L),
fileStatus.getModificationTime(), lastUpdated,
appListener.sparkUser.getOrElse(NOT_STARTED), appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted, appCompleted,
fileStatus.getLen() fileStatus.getLen()
...@@ -546,7 +550,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ...@@ -546,7 +550,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = { def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
now - attempt.lastUpdated > maxAge && attempt.completed now - attempt.lastUpdated > maxAge
} }
// Scan all logs from the log directory. // Scan all logs from the log directory.
......
...@@ -66,7 +66,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -66,7 +66,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
} }
test("Parse application logs") { test("Parse application logs") {
val provider = new FsHistoryProvider(createTestConf()) val clock = new ManualClock(12345678)
val provider = new FsHistoryProvider(createTestConf(), clock)
// Write a new-style application log. // Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false) val newAppComplete = newLogFile("new1", None, inProgress = false)
...@@ -109,12 +110,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -109,12 +110,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed))) List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
} }
// For completed files, lastUpdated would be lastModified time.
list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L, list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L,
newAppComplete.lastModified(), "test", true)) newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(), list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
1L, 4L, newAppCompressedComplete.lastModified(), "test", true)) 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
// For Inprogress files, lastUpdated would be current loading time.
list(2) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L, list(2) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
newAppIncomplete.lastModified(), "test", false)) clock.getTimeMillis(), "test", false))
// Make sure the UI can be rendered. // Make sure the UI can be rendered.
list.foreach { case info => list.foreach { case info =>
...@@ -299,6 +303,48 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -299,6 +303,48 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!log2.exists()) assert(!log2.exists())
} }
test("log cleaner for inProgress files") {
val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)
val maxAge = TimeUnit.SECONDS.toMillis(40)
val clock = new ManualClock(0)
val provider = new FsHistoryProvider(
createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
val log1 = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log1, true, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1"))
)
clock.setTime(firstFileModifiedTime)
provider.checkForLogs()
val log2 = newLogFile("inProgressApp2", None, inProgress = true)
writeFile(log2, true, None,
SparkListenerApplicationStart(
"inProgressApp2", Some("inProgressApp2"), 23L, "test2", Some("attempt2"))
)
clock.setTime(secondFileModifiedTime)
provider.checkForLogs()
// This should not trigger any cleanup
updateAndCheck(provider)(list => list.size should be(2))
// Should trigger cleanup for first file but not second one
clock.setTime(firstFileModifiedTime + maxAge + 1)
updateAndCheck(provider)(list => list.size should be(1))
assert(!log1.exists())
assert(log2.exists())
// Should cleanup the second file as well.
clock.setTime(secondFileModifiedTime + maxAge + 1)
updateAndCheck(provider)(list => list.size should be(0))
assert(!log1.exists())
assert(!log2.exists())
}
test("Event log copy") { test("Event log copy") {
val provider = new FsHistoryProvider(createTestConf()) val provider = new FsHistoryProvider(createTestConf())
val logs = (1 to 2).map { i => val logs = (1 to 2).map { i =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment