From c99492141b1ddddb8edb6841a6e83748e5ba9bba Mon Sep 17 00:00:00 2001 From: Yuming Wang <wgyumg@gmail.com> Date: Mon, 23 Jan 2017 11:02:22 +0000 Subject: [PATCH] [SPARK-19146][CORE] Drop more elements when stageData.taskData.size > retainedTasks ## What changes were proposed in this pull request? Drop more elements when `stageData.taskData.size > retainedTasks` to reduce the number of times on call drop function. ## How was this patch tested? Jenkins Author: Yuming Wang <wgyumg@gmail.com> Closes #16527 from wangyum/SPARK-19146. --- .../spark/ui/jobs/JobProgressListener.scala | 14 +++++++-- .../ui/jobs/JobProgressListenerSuite.scala | 30 +++++++++++++++++++ docs/configuration.md | 12 ++++---- 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 83dc5d8745..e87caff426 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -142,7 +142,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** If stages is too large, remove and garbage collect old stages */ private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { - val toRemove = (stages.size - retainedStages) + val toRemove = calculateNumberToRemove(stages.size, retainedStages) stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) stageIdToInfo.remove(s.stageId) @@ -154,7 +154,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** If jobs is too large, remove and garbage collect old jobs */ private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized { if (jobs.size > retainedJobs) { - val toRemove = (jobs.size - retainedJobs) + val toRemove = calculateNumberToRemove(jobs.size, retainedJobs) jobs.take(toRemove).foreach { job => // Remove the job's UI data, if it exists jobIdToData.remove(job.jobId).foreach { removedJob => @@ -409,7 +409,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // If Tasks is too large, remove and garbage collect old tasks if (stageData.taskData.size > retainedTasks) { - stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks) + stageData.taskData = stageData.taskData.drop( + calculateNumberToRemove(stageData.taskData.size, retainedTasks)) } for ( @@ -430,6 +431,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } } + /** + * Remove at least (maxRetained / 10) items to reduce friction. + */ + private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = { + math.max(retainedSize / 10, dataSize - retainedSize) + } + /** * Upon receiving new metrics for a task, updates the per-stage and per-executor-per-stage * aggregate metrics by calculating deltas between the currently recorded metrics and the new diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index da853f1be8..e3127da9a6 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -408,4 +408,34 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) assert(newTaskInfo.accumulables === Seq(userAccum)) } + + test("SPARK-19146 drop more elements when stageData.taskData.size > retainedTasks") { + val conf = new SparkConf() + conf.set("spark.ui.retainedTasks", "100") + val taskMetrics = TaskMetrics.empty + taskMetrics.mergeShuffleReadMetrics() + val task = new ShuffleMapTask(0) + val taskType = Utils.getFormattedClassName(task) + + val listener1 = new JobProgressListener(conf) + for (t <- 1 to 101) { + val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) + taskInfo.finishTime = 1 + listener1.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + } + // 101 - math.max(100 / 10, 101 - 100) = 91 + assert(listener1.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 91) + + val listener2 = new JobProgressListener(conf) + for (t <- 1 to 150) { + val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) + taskInfo.finishTime = 1 + listener2.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + } + // 150 - math.max(100 / 10, 150 - 100) = 100 + assert(listener2.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 100) + } + } diff --git a/docs/configuration.md b/docs/configuration.md index 7a11a983d5..a6b1f15fda 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -684,24 +684,24 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.ui.retainedJobs</code></td> <td>1000</td> <td> - How many jobs the Spark UI and status APIs remember before garbage - collecting. + How many jobs the Spark UI and status APIs remember before garbage collecting. + This is a target maximum, and fewer elements may be retained in some circumstances. </td> </tr> <tr> <td><code>spark.ui.retainedStages</code></td> <td>1000</td> <td> - How many stages the Spark UI and status APIs remember before garbage - collecting. + How many stages the Spark UI and status APIs remember before garbage collecting. + This is a target maximum, and fewer elements may be retained in some circumstances. </td> </tr> <tr> <td><code>spark.ui.retainedTasks</code></td> <td>100000</td> <td> - How many tasks the Spark UI and status APIs remember before garbage - collecting. + How many tasks the Spark UI and status APIs remember before garbage collecting. + This is a target maximum, and fewer elements may be retained in some circumstances. </td> </tr> <tr> -- GitLab