diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f2517401cb76b38ae5e9ce76c7426481ddc3b56d..7fde34d8974c0bafe0292da5c916cb1832d54949 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1089,7 +1089,8 @@ class DAGScheduler( // To avoid UI cruft, ignore cases where value wasn't updated if (acc.name.isDefined && !updates.isZero) { stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) - event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value)) + event.taskInfo.setAccumulables( + acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index eeb7963c9e610e3e42d6629d56041a7a600984af..59680139e7af3e69d5df3d632cd73a5c30265728 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler -import scala.collection.mutable.ListBuffer - import org.apache.spark.TaskState import org.apache.spark.TaskState.TaskState import org.apache.spark.annotation.DeveloperApi @@ -54,7 +52,13 @@ class TaskInfo( * accumulable to be updated multiple times in a single task or for two accumulables with the * same name but different IDs to exist in a task. */ - val accumulables = ListBuffer[AccumulableInfo]() + def accumulables: Seq[AccumulableInfo] = _accumulables + + private[this] var _accumulables: Seq[AccumulableInfo] = Nil + + private[spark] def setAccumulables(newAccumulables: Seq[AccumulableInfo]): Unit = { + _accumulables = newAccumulables + } /** * The time when the task has completed successfully (including the time to remotely fetch diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index f4a04609c4c694672867a2ff43f879113147a3fc..9ce8542f0279131b3e7eb3b5059d55bd473b7bd6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import scala.collection.mutable.{HashMap, LinkedHashMap} import org.apache.spark.JobExecutionStatus -import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.executor._ import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} import org.apache.spark.util.AccumulatorContext import org.apache.spark.util.collection.OpenHashSet @@ -147,9 +147,8 @@ private[spark] object UIData { memoryBytesSpilled = m.memoryBytesSpilled, diskBytesSpilled = m.diskBytesSpilled, peakExecutionMemory = m.peakExecutionMemory, - inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead), - outputMetrics = - OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten), + inputMetrics = InputMetricsUIData(m.inputMetrics), + outputMetrics = OutputMetricsUIData(m.outputMetrics), shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics), shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics)) } @@ -171,9 +170,9 @@ private[spark] object UIData { speculative = taskInfo.speculative ) newTaskInfo.gettingResultTime = taskInfo.gettingResultTime - newTaskInfo.accumulables ++= taskInfo.accumulables.filter { + newTaskInfo.setAccumulables(taskInfo.accumulables.filter { accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER) - } + }) newTaskInfo.finishTime = taskInfo.finishTime newTaskInfo.failed = taskInfo.failed newTaskInfo @@ -197,8 +196,32 @@ private[spark] object UIData { shuffleWriteMetrics: ShuffleWriteMetricsUIData) case class InputMetricsUIData(bytesRead: Long, recordsRead: Long) + object InputMetricsUIData { + def apply(metrics: InputMetrics): InputMetricsUIData = { + if (metrics.bytesRead == 0 && metrics.recordsRead == 0) { + EMPTY + } else { + new InputMetricsUIData( + bytesRead = metrics.bytesRead, + recordsRead = metrics.recordsRead) + } + } + private val EMPTY = InputMetricsUIData(0, 0) + } case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long) + object OutputMetricsUIData { + def apply(metrics: OutputMetrics): OutputMetricsUIData = { + if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0) { + EMPTY + } else { + new OutputMetricsUIData( + bytesWritten = metrics.bytesWritten, + recordsWritten = metrics.recordsWritten) + } + } + private val EMPTY = OutputMetricsUIData(0, 0) + } case class ShuffleReadMetricsUIData( remoteBlocksFetched: Long, @@ -212,17 +235,30 @@ private[spark] object UIData { object ShuffleReadMetricsUIData { def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = { - new ShuffleReadMetricsUIData( - remoteBlocksFetched = metrics.remoteBlocksFetched, - localBlocksFetched = metrics.localBlocksFetched, - remoteBytesRead = metrics.remoteBytesRead, - localBytesRead = metrics.localBytesRead, - fetchWaitTime = metrics.fetchWaitTime, - recordsRead = metrics.recordsRead, - totalBytesRead = metrics.totalBytesRead, - totalBlocksFetched = metrics.totalBlocksFetched - ) + if ( + metrics.remoteBlocksFetched == 0 && + metrics.localBlocksFetched == 0 && + metrics.remoteBytesRead == 0 && + metrics.localBytesRead == 0 && + metrics.fetchWaitTime == 0 && + metrics.recordsRead == 0 && + metrics.totalBytesRead == 0 && + metrics.totalBlocksFetched == 0) { + EMPTY + } else { + new ShuffleReadMetricsUIData( + remoteBlocksFetched = metrics.remoteBlocksFetched, + localBlocksFetched = metrics.localBlocksFetched, + remoteBytesRead = metrics.remoteBytesRead, + localBytesRead = metrics.localBytesRead, + fetchWaitTime = metrics.fetchWaitTime, + recordsRead = metrics.recordsRead, + totalBytesRead = metrics.totalBytesRead, + totalBlocksFetched = metrics.totalBlocksFetched + ) + } } + private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0) } case class ShuffleWriteMetricsUIData( @@ -232,12 +268,17 @@ private[spark] object UIData { object ShuffleWriteMetricsUIData { def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = { - new ShuffleWriteMetricsUIData( - bytesWritten = metrics.bytesWritten, - recordsWritten = metrics.recordsWritten, - writeTime = metrics.writeTime - ) + if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0 && metrics.writeTime == 0) { + EMPTY + } else { + new ShuffleWriteMetricsUIData( + bytesWritten = metrics.bytesWritten, + recordsWritten = metrics.recordsWritten, + writeTime = metrics.writeTime + ) + } } + private val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0) } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 6593aab33f6df0aa424c5705fe5318f43365383f..4b4d2d10cbf8d7d45e6a9919d8be8af5344a4797 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -702,8 +702,8 @@ private[spark] object JsonProtocol { val index = (json \ "Index").extract[Int] val attempt = Utils.jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1) val launchTime = (json \ "Launch Time").extract[Long] - val executorId = (json \ "Executor ID").extract[String] - val host = (json \ "Host").extract[String] + val executorId = (json \ "Executor ID").extract[String].intern() + val host = (json \ "Host").extract[String].intern() val taskLocality = TaskLocality.withName((json \ "Locality").extract[String]) val speculative = Utils.jsonOption(json \ "Speculative").exists(_.extract[Boolean]) val gettingResultTime = (json \ "Getting Result Time").extract[Long] @@ -721,7 +721,7 @@ private[spark] object JsonProtocol { taskInfo.finishTime = finishTime taskInfo.failed = failed taskInfo.killed = killed - accumulables.foreach { taskInfo.accumulables += _ } + taskInfo.setAccumulables(accumulables) taskInfo } @@ -903,8 +903,8 @@ private[spark] object JsonProtocol { if (json == JNothing) { return null } - val executorId = (json \ "Executor ID").extract[String] - val host = (json \ "Host").extract[String] + val executorId = (json \ "Executor ID").extract[String].intern() + val host = (json \ "Host").extract[String].intern() val port = (json \ "Port").extract[Int] BlockManagerId(executorId, host, port) } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 8418fa74d2c63f284ca8a9525517af5a7d0f65e5..da853f1be8b952e6f70fea576c1cf3838a4287ae 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -403,7 +403,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with internal = false, countFailedValues = false, metadata = None) - taskInfo.accumulables ++= Seq(internalAccum, sqlAccum, userAccum) + taskInfo.setAccumulables(List(internalAccum, sqlAccum, userAccum)) val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) assert(newTaskInfo.accumulables === Seq(userAccum)) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index d5146d70ebaa331abba2fff61d2e878c4ec52697..85da79180fd0b85cf151df42cf845676f3e3f4f7 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -788,11 +788,8 @@ private[spark] object JsonProtocolSuite extends Assertions { private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = { val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) - val (acc1, acc2, acc3) = - (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)) - taskInfo.accumulables += acc1 - taskInfo.accumulables += acc2 - taskInfo.accumulables += acc3 + taskInfo.setAccumulables( + List(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true))) taskInfo } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 350b144f8294b2f2e7afdaddaf0039c0b692f246..12f7ed202b9dbfffc2d02e2caa7a27645e43a359 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -86,7 +86,10 @@ object MimaExcludes { // [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness. ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=") + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="), + + // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables") ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 19b6d2603129c8c465d065eb8f32f04f1bc33f5a..948a155457b65466349f2d5a86e700206357fc5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -374,7 +374,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None) val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None) val taskInfo = createTaskInfo(0, 0) - taskInfo.accumulables ++= Seq(sqlMetricInfo, nonSqlMetricInfo) + taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo)) val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null) listener.onOtherEvent(executionStart) listener.onJobStart(jobStart)