diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index d10aa12b9ebca3f1e002d24bc5ae2683ebdc529b..cd4be57227a16c062390b917cd33fb41e8597c5b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
   /** If stages is too large, remove and garbage collect old stages */
   private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > retainedStages) {
-      val toRemove = retainedStages / 10
-      stages.takeRight(toRemove).foreach( s => {
+      val toRemove = math.max(retainedStages / 10, 1)
+      stages.take(toRemove).foreach { s =>
         stageIdToTaskData.remove(s.stageId)
         stageIdToTime.remove(s.stageId)
         stageIdToShuffleRead.remove(s.stageId)
@@ -94,8 +94,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
         stageIdToTasksFailed.remove(s.stageId)
         stageIdToPool.remove(s.stageId)
         if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)}
-      })
-      stages.trimEnd(toRemove)
+      }
+      stages.trimStart(toRemove)
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index d8a3e859f85cdc65393d874046cd44d0e16ce84d..67ceee505db3ca954d2b988af8dfa6717d1cb249 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -18,13 +18,42 @@
 package org.apache.spark.ui.jobs
 
 import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.{LocalSparkContext, SparkContext, Success}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success}
 import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
-class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
+  test("test LRU eviction of stages") {
+    val conf = new SparkConf()
+    conf.set("spark.ui.retainedStages", 5.toString)
+    val listener = new JobProgressListener(conf)
+
+    def createStageStartEvent(stageId: Int) = {
+      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+      SparkListenerStageSubmitted(stageInfo)
+    }
+
+    def createStageEndEvent(stageId: Int) = {
+      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+      SparkListenerStageCompleted(stageInfo)
+    }
+
+    for (i <- 1 to 50) {
+      listener.onStageSubmitted(createStageStartEvent(i))
+      listener.onStageCompleted(createStageEndEvent(i))
+    }
+
+    listener.completedStages.size should be (5)
+    listener.completedStages.filter(_.stageId == 50).size should be (1)
+    listener.completedStages.filter(_.stageId == 49).size should be (1)
+    listener.completedStages.filter(_.stageId == 48).size should be (1)
+    listener.completedStages.filter(_.stageId == 47).size should be (1)
+    listener.completedStages.filter(_.stageId == 46).size should be (1)
+  }
+
   test("test executor id to summary") {
     val sc = new SparkContext("local", "test")
     val listener = new JobProgressListener(sc.conf)