diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 8ef69b142cd15c770446ff713544374960dc5d6d..3011ed0f95d8e5af8aca74d256bfa970643065c2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -446,9 +446,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } val logPath = fileStatus.getPath() - val appCompleted = isApplicationCompleted(fileStatus) + // Use loading time as lastUpdated since some filesystems don't update modifiedTime + // each time file is updated. However use modifiedTime for completed jobs so lastUpdated + // won't change whenever HistoryServer restarts and reloads the file. + val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis() + val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter) // Without an app ID, new logs will render incorrectly in the listing page, so do not list or @@ -461,7 +465,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appListener.appAttemptId, appListener.startTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L), - fileStatus.getModificationTime(), + lastUpdated, appListener.sparkUser.getOrElse(NOT_STARTED), appCompleted, fileStatus.getLen() @@ -546,7 +550,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = { - now - attempt.lastUpdated > maxAge && attempt.completed + now - attempt.lastUpdated > maxAge } // Scan all logs from the log directory. diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 2c41c432d1fe259a9fff6c5e11e89d8c5fcb538b..027f412c7581b679e69068d902721054a337a47e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -66,7 +66,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("Parse application logs") { - val provider = new FsHistoryProvider(createTestConf()) + val clock = new ManualClock(12345678) + val provider = new FsHistoryProvider(createTestConf(), clock) // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) @@ -109,12 +110,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed))) } + // For completed files, lastUpdated would be lastModified time. list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L, newAppComplete.lastModified(), "test", true)) list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(), 1L, 4L, newAppCompressedComplete.lastModified(), "test", true)) + + // For Inprogress files, lastUpdated would be current loading time. list(2) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L, - newAppIncomplete.lastModified(), "test", false)) + clock.getTimeMillis(), "test", false)) // Make sure the UI can be rendered. list.foreach { case info => @@ -299,6 +303,48 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc assert(!log2.exists()) } + test("log cleaner for inProgress files") { + val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10) + val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20) + val maxAge = TimeUnit.SECONDS.toMillis(40) + val clock = new ManualClock(0) + val provider = new FsHistoryProvider( + createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) + + val log1 = newLogFile("inProgressApp1", None, inProgress = true) + writeFile(log1, true, None, + SparkListenerApplicationStart( + "inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")) + ) + + clock.setTime(firstFileModifiedTime) + provider.checkForLogs() + + val log2 = newLogFile("inProgressApp2", None, inProgress = true) + writeFile(log2, true, None, + SparkListenerApplicationStart( + "inProgressApp2", Some("inProgressApp2"), 23L, "test2", Some("attempt2")) + ) + + clock.setTime(secondFileModifiedTime) + provider.checkForLogs() + + // This should not trigger any cleanup + updateAndCheck(provider)(list => list.size should be(2)) + + // Should trigger cleanup for first file but not second one + clock.setTime(firstFileModifiedTime + maxAge + 1) + updateAndCheck(provider)(list => list.size should be(1)) + assert(!log1.exists()) + assert(log2.exists()) + + // Should cleanup the second file as well. + clock.setTime(secondFileModifiedTime + maxAge + 1) + updateAndCheck(provider)(list => list.size should be(0)) + assert(!log1.exists()) + assert(!log2.exists()) + } + test("Event log copy") { val provider = new FsHistoryProvider(createTestConf()) val logs = (1 to 2).map { i =>