diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 83dc5d874589e90571549efa6bcc6b44da67ca64..e87caff426436b72d3b0b13efcbb057ba6e93a18 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -142,7 +142,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** If stages is too large, remove and garbage collect old stages */ private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { - val toRemove = (stages.size - retainedStages) + val toRemove = calculateNumberToRemove(stages.size, retainedStages) stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) stageIdToInfo.remove(s.stageId) @@ -154,7 +154,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** If jobs is too large, remove and garbage collect old jobs */ private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized { if (jobs.size > retainedJobs) { - val toRemove = (jobs.size - retainedJobs) + val toRemove = calculateNumberToRemove(jobs.size, retainedJobs) jobs.take(toRemove).foreach { job => // Remove the job's UI data, if it exists jobIdToData.remove(job.jobId).foreach { removedJob => @@ -409,7 +409,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // If Tasks is too large, remove and garbage collect old tasks if (stageData.taskData.size > retainedTasks) { - stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks) + stageData.taskData = stageData.taskData.drop( + calculateNumberToRemove(stageData.taskData.size, retainedTasks)) } for ( @@ -430,6 +431,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } } + /** + * Remove at least (maxRetained / 10) items to reduce friction. + */ + private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = { + math.max(retainedSize / 10, dataSize - retainedSize) + } + /** * Upon receiving new metrics for a task, updates the per-stage and per-executor-per-stage * aggregate metrics by calculating deltas between the currently recorded metrics and the new diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index da853f1be8b952e6f70fea576c1cf3838a4287ae..e3127da9a6b24a7be5109d43c05a91d1ab0a8ce7 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -408,4 +408,34 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) assert(newTaskInfo.accumulables === Seq(userAccum)) } + + test("SPARK-19146 drop more elements when stageData.taskData.size > retainedTasks") { + val conf = new SparkConf() + conf.set("spark.ui.retainedTasks", "100") + val taskMetrics = TaskMetrics.empty + taskMetrics.mergeShuffleReadMetrics() + val task = new ShuffleMapTask(0) + val taskType = Utils.getFormattedClassName(task) + + val listener1 = new JobProgressListener(conf) + for (t <- 1 to 101) { + val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) + taskInfo.finishTime = 1 + listener1.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + } + // 101 - math.max(100 / 10, 101 - 100) = 91 + assert(listener1.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 91) + + val listener2 = new JobProgressListener(conf) + for (t <- 1 to 150) { + val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) + taskInfo.finishTime = 1 + listener2.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + } + // 150 - math.max(100 / 10, 150 - 100) = 100 + assert(listener2.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 100) + } + } diff --git a/docs/configuration.md b/docs/configuration.md index 7a11a983d5972eef18ddebc5b0ed3f2514b69b88..a6b1f15fdabfc94b469f94b2c9e0497e66486b15 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -684,24 +684,24 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.ui.retainedJobs</code></td> <td>1000</td> <td> - How many jobs the Spark UI and status APIs remember before garbage - collecting. + How many jobs the Spark UI and status APIs remember before garbage collecting. + This is a target maximum, and fewer elements may be retained in some circumstances. </td> </tr> <tr> <td><code>spark.ui.retainedStages</code></td> <td>1000</td> <td> - How many stages the Spark UI and status APIs remember before garbage - collecting. + How many stages the Spark UI and status APIs remember before garbage collecting. + This is a target maximum, and fewer elements may be retained in some circumstances. </td> </tr> <tr> <td><code>spark.ui.retainedTasks</code></td> <td>100000</td> <td> - How many tasks the Spark UI and status APIs remember before garbage - collecting. + How many tasks the Spark UI and status APIs remember before garbage collecting. + This is a target maximum, and fewer elements may be retained in some circumstances. </td> </tr> <tr>