From ee6e9e7d863022304ac9ced405b353b63accb6ab Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Thu, 3 Apr 2014 22:13:56 -0700
Subject: [PATCH] SPARK-1337: Application web UI garbage collects newest stages

Simple fix...

Author: Patrick Wendell <pwendell@gmail.com>

Closes #320 from pwendell/stage-clean-up and squashes the following commits:

29be62e [Patrick Wendell] SPARK-1337: Application web UI garbage collects newest stages instead old ones
---
 .../spark/ui/jobs/JobProgressListener.scala   |  8 ++---
 .../ui/jobs/JobProgressListenerSuite.scala    | 33 +++++++++++++++++--
 2 files changed, 35 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index d10aa12b9e..cd4be57227 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
   /** If stages is too large, remove and garbage collect old stages */
   private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > retainedStages) {
-      val toRemove = retainedStages / 10
-      stages.takeRight(toRemove).foreach( s => {
+      val toRemove = math.max(retainedStages / 10, 1)
+      stages.take(toRemove).foreach { s =>
         stageIdToTaskData.remove(s.stageId)
         stageIdToTime.remove(s.stageId)
         stageIdToShuffleRead.remove(s.stageId)
@@ -94,8 +94,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
         stageIdToTasksFailed.remove(s.stageId)
         stageIdToPool.remove(s.stageId)
         if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)}
-      })
-      stages.trimEnd(toRemove)
+      }
+      stages.trimStart(toRemove)
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index d8a3e859f8..67ceee505d 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -18,13 +18,42 @@
 package org.apache.spark.ui.jobs
 
 import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.{LocalSparkContext, SparkContext, Success}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success}
 import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
-class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
+  test("test LRU eviction of stages") {
+    val conf = new SparkConf()
+    conf.set("spark.ui.retainedStages", 5.toString)
+    val listener = new JobProgressListener(conf)
+
+    def createStageStartEvent(stageId: Int) = {
+      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+      SparkListenerStageSubmitted(stageInfo)
+    }
+
+    def createStageEndEvent(stageId: Int) = {
+      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+      SparkListenerStageCompleted(stageInfo)
+    }
+
+    for (i <- 1 to 50) {
+      listener.onStageSubmitted(createStageStartEvent(i))
+      listener.onStageCompleted(createStageEndEvent(i))
+    }
+
+    listener.completedStages.size should be (5)
+    listener.completedStages.filter(_.stageId == 50).size should be (1)
+    listener.completedStages.filter(_.stageId == 49).size should be (1)
+    listener.completedStages.filter(_.stageId == 48).size should be (1)
+    listener.completedStages.filter(_.stageId == 47).size should be (1)
+    listener.completedStages.filter(_.stageId == 46).size should be (1)
+  }
+
   test("test executor id to summary") {
     val sc = new SparkContext("local", "test")
     val listener = new JobProgressListener(sc.conf)
-- 
GitLab