diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index 601b503d12c7e324de8c275b72fccd3e776a3619..e8f053c150693448e31ba734f37d7de3a0f75571 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
 
 
 /**
- * A data type that can be accumulated, ie has an commutative and associative "add" operation,
+ * A data type that can be accumulated, i.e. has an commutative and associative "add" operation,
  * but where the result type, `R`, may be different from the element type being added, `T`.
  *
  * You must define how to add data, and how to merge two of these together.  For some data types,
@@ -36,19 +36,12 @@ import org.apache.spark.util.Utils
  * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
  * accumulating a set. You will add items to the set, and you will union two sets together.
  *
- * All accumulators created on the driver to be used on the executors must be registered with
- * [[Accumulators]]. This is already done automatically for accumulators created by the user.
- * Internal accumulators must be explicitly registered by the caller.
- *
  * Operations are not thread-safe.
  *
  * @param id ID of this accumulator; for internal use only.
  * @param initialValue initial value of accumulator
  * @param param helper object defining how to add elements of type `R` and `T`
  * @param name human-readable name for use in Spark's web UI
- * @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
- *                 to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
- *                 thread safe so that they can be reported correctly.
  * @param countFailedValues whether to accumulate values from failed tasks. This is set to true
  *                          for system and time metrics like serialization time or bytes spilled,
  *                          and false for things with absolute values like number of input rows.
@@ -62,7 +55,6 @@ class Accumulable[R, T] private (
     @transient private val initialValue: R,
     param: AccumulableParam[R, T],
     val name: Option[String],
-    internal: Boolean,
     private[spark] val countFailedValues: Boolean)
   extends Serializable {
 
@@ -70,41 +62,21 @@ class Accumulable[R, T] private (
       initialValue: R,
       param: AccumulableParam[R, T],
       name: Option[String],
-      internal: Boolean,
       countFailedValues: Boolean) = {
-    this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues)
+    this(Accumulators.newId(), initialValue, param, name, countFailedValues)
   }
 
-  private[spark] def this(
-      initialValue: R,
-      param: AccumulableParam[R, T],
-      name: Option[String],
-      internal: Boolean) = {
-    this(initialValue, param, name, internal, false /* countFailedValues */)
+  private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = {
+    this(initialValue, param, name, false /* countFailedValues */)
   }
 
-  def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
-    this(initialValue, param, name, false /* internal */)
-
   def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)
 
   @volatile @transient private var value_ : R = initialValue // Current value on driver
   val zero = param.zero(initialValue) // Zero value to be passed to executors
   private var deserialized = false
 
-  // In many places we create internal accumulators without access to the active context cleaner,
-  // so if we register them here then we may never unregister these accumulators. To avoid memory
-  // leaks, we require the caller to explicitly register internal accumulators elsewhere.
-  if (!internal) {
-    Accumulators.register(this)
-  }
-
-  /**
-   * If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
-   * via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be
-   * reported correctly.
-   */
-  private[spark] def isInternal: Boolean = internal
+  Accumulators.register(this)
 
   /**
    * Return a copy of this [[Accumulable]].
@@ -114,7 +86,7 @@ class Accumulable[R, T] private (
    * same mutable instance around.
    */
   private[spark] def copy(): Accumulable[R, T] = {
-    new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues)
+    new Accumulable[R, T](id, initialValue, param, name, countFailedValues)
   }
 
   /**
@@ -192,7 +164,8 @@ class Accumulable[R, T] private (
    * Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values.
    */
   private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
-    new AccumulableInfo(id, name, update, value, internal, countFailedValues)
+    val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
+    new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
   }
 
   // Called by Java when deserializing an object
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 985752933a6ab8d9b67c0d6cfc650cfeb44583be..0c17f014c90db4085e7244717789eb74d2c56968 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -56,7 +56,6 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
  * @param initialValue initial value of accumulator
  * @param param helper object defining how to add elements of type `T`
  * @param name human-readable name associated with this accumulator
- * @param internal whether this accumulator is used internally within Spark only
  * @param countFailedValues whether to accumulate values from failed tasks
  * @tparam T result type
  */
@@ -64,19 +63,9 @@ class Accumulator[T] private[spark] (
     // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
     @transient private val initialValue: T,
     param: AccumulatorParam[T],
-    name: Option[String],
-    internal: Boolean,
-    private[spark] override val countFailedValues: Boolean = false)
-  extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {
-
-  def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
-    this(initialValue, param, name, false /* internal */)
-  }
-
-  def this(initialValue: T, param: AccumulatorParam[T]) = {
-    this(initialValue, param, None, false /* internal */)
-  }
-}
+    name: Option[String] = None,
+    countFailedValues: Boolean = false)
+  extends Accumulable[T, T](initialValue, param, name, countFailedValues)
 
 
 // TODO: The multi-thread support in accumulators is kind of lame; check
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index e8f83c6d14b3743d89a95139a6b566ac08a37edb..43e555670dc694c6911eb0dab6c956189f07233c 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -36,7 +36,8 @@ private[spark] class TaskContextImpl(
     override val taskMemoryManager: TaskMemoryManager,
     localProperties: Properties,
     @transient private val metricsSystem: MetricsSystem,
-    override val taskMetrics: TaskMetrics = new TaskMetrics)
+    // The default value is only used in tests.
+    override val taskMetrics: TaskMetrics = TaskMetrics.empty)
   extends TaskContext
   with Logging {
 
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 8e9a332b7c556f28040ee3f8a930e49bd8efc367..f012a74db6c2c4d30a9fbf79faa360d2229377d1 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -101,9 +101,9 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
 
   /**
    * Resets the value of the current metrics (`this`) and and merges all the independent
-   * [[ShuffleReadMetrics]] into `this`.
+   * [[TempShuffleReadMetrics]] into `this`.
    */
-  private[spark] def setMergeValues(metrics: Seq[ShuffleReadMetrics]): Unit = {
+  private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = {
     _remoteBlocksFetched.setValue(_remoteBlocksFetched.zero)
     _localBlocksFetched.setValue(_localBlocksFetched.zero)
     _remoteBytesRead.setValue(_remoteBytesRead.zero)
@@ -119,5 +119,32 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
       _recordsRead.add(metric.recordsRead)
     }
   }
+}
 
+/**
+ * A temporary shuffle read metrics holder that is used to collect shuffle read metrics for each
+ * shuffle dependency, and all temporary metrics will be merged into the [[ShuffleReadMetrics]] at
+ * last.
+ */
+private[spark] class TempShuffleReadMetrics {
+  private[this] var _remoteBlocksFetched = 0
+  private[this] var _localBlocksFetched = 0
+  private[this] var _remoteBytesRead = 0L
+  private[this] var _localBytesRead = 0L
+  private[this] var _fetchWaitTime = 0L
+  private[this] var _recordsRead = 0L
+
+  def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched += v
+  def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched += v
+  def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v
+  def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
+  def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
+  def incRecordsRead(v: Long): Unit = _recordsRead += v
+
+  def remoteBlocksFetched: Int = _remoteBlocksFetched
+  def localBlocksFetched: Int = _localBlocksFetched
+  def remoteBytesRead: Long = _remoteBytesRead
+  def localBytesRead: Long = _localBytesRead
+  def fetchWaitTime: Long = _fetchWaitTime
+  def recordsRead: Long = _recordsRead
 }
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 4558fbb4d95d84c352c16b55242195c5d2f90063..8513d053f2e971dc1964c1aac39163c12c0f2d77 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -143,23 +143,23 @@ class TaskMetrics private[spark] () extends Serializable {
   val shuffleWriteMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics()
 
   /**
-   * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
+   * A list of [[TempShuffleReadMetrics]], one per shuffle dependency.
    *
    * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
-   * issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for
-   * each dependency and merge these metrics before reporting them to the driver.
+   * issues from readers in different threads, in-progress tasks use a [[TempShuffleReadMetrics]]
+   * for each dependency and merge these metrics before reporting them to the driver.
    */
-  @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics]
+  @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[TempShuffleReadMetrics]
 
   /**
-   * Create a temporary [[ShuffleReadMetrics]] for a particular shuffle dependency.
+   * Create a [[TempShuffleReadMetrics]] for a particular shuffle dependency.
    *
    * All usages are expected to be followed by a call to [[mergeShuffleReadMetrics]], which
    * merges the temporary values synchronously. Otherwise, all temporary data collected will
    * be lost.
    */
-  private[spark] def createTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized {
-    val readMetrics = new ShuffleReadMetrics
+  private[spark] def createTempShuffleReadMetrics(): TempShuffleReadMetrics = synchronized {
+    val readMetrics = new TempShuffleReadMetrics
     tempShuffleReadMetrics += readMetrics
     readMetrics
   }
@@ -195,9 +195,8 @@ class TaskMetrics private[spark] () extends Serializable {
    |        OTHER THINGS        |
    * ========================== */
 
-  private[spark] def registerAccums(sc: SparkContext): Unit = {
+  private[spark] def registerForCleanup(sc: SparkContext): Unit = {
     internalAccums.foreach { accum =>
-      Accumulators.register(accum)
       sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
     }
   }
@@ -244,7 +243,14 @@ private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) ext
 
 private[spark] object TaskMetrics extends Logging {
 
-  def empty: TaskMetrics = new TaskMetrics
+  /**
+   * Create an empty task metrics that doesn't register its accumulators.
+   */
+  def empty: TaskMetrics = {
+    val metrics = new TaskMetrics
+    metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
+    metrics
+  }
 
   /**
    * Create a new accumulator representing an internal task metric.
@@ -253,7 +259,7 @@ private[spark] object TaskMetrics extends Logging {
       initialValue: T,
       name: String,
       param: AccumulatorParam[T]): Accumulator[T] = {
-    new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
+    new Accumulator[T](initialValue, param, Some(name), countFailedValues = true)
   }
 
   def createLongAccum(name: String): Accumulator[Long] = {
@@ -281,6 +287,9 @@ private[spark] object TaskMetrics extends Logging {
   def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = {
     val definedAccumUpdates = accumUpdates.filter(_.update.isDefined)
     val metrics = new ListenerTaskMetrics(definedAccumUpdates)
+    // We don't register this [[ListenerTaskMetrics]] for cleanup, and this is only used to post
+    // event, we should un-register all accumulators immediately.
+    metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
     definedAccumUpdates.filter(_.internal).foreach { accum =>
       metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get))
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index d5cf6b82e86f01a90a40ed68c6d980976eca8cc1..02185bf631fdcd946e76f71681ab57321467dbe8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -112,7 +112,7 @@ private[scheduler] abstract class Stage(
       numPartitionsToCompute: Int,
       taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
     val metrics = new TaskMetrics
-    metrics.registerAccums(rdd.sparkContext)
+    metrics.registerForCleanup(rdd.sparkContext)
     _latestInfo = StageInfo.fromStage(
       this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
     nextAttemptId += 1
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 58349fe250887114871159d996e2ba3d69b33f00..c513ed36d16803573bdc411f74f5f91ff5536e4e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -36,7 +36,7 @@ class StageInfo(
     val rddInfos: Seq[RDDInfo],
     val parentIds: Seq[Int],
     val details: String,
-    val taskMetrics: TaskMetrics = new TaskMetrics,
+    val taskMetrics: TaskMetrics = null,
     private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
   var submissionTime: Option[Long] = None
@@ -81,7 +81,7 @@ private[spark] object StageInfo {
       stage: Stage,
       attemptId: Int,
       numTasks: Option[Int] = None,
-      taskMetrics: TaskMetrics = new TaskMetrics,
+      taskMetrics: TaskMetrics = null,
       taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
     ): StageInfo = {
     val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 9f2fa02c69ab1d07cf7ae7b47c87e89ef7ae164a..eb10f3e69b09249aab5108e511bbe1d1c9038127 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -46,14 +46,13 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti
  * @param partitionId index of the number in the RDD
  * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
  * @param localProperties copy of thread-local properties set by the user on the driver side.
- *
- * The default values for `metrics` and `localProperties` are used by tests only.
  */
 private[spark] abstract class Task[T](
     val stageId: Int,
     val stageAttemptId: Int,
     val partitionId: Int,
-    val metrics: TaskMetrics = new TaskMetrics,
+    // The default value is only used in tests.
+    val metrics: TaskMetrics = TaskMetrics.empty,
     @transient var localProperties: Properties = new Properties) extends Serializable {
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 9e4771ce4ac512052db8c5616b22fb4ab7732c1d..9ab7d96e290d647ad114cec92f7fb52fc196c6e0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -326,7 +326,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
     val taskInfo = taskStart.taskInfo
     if (taskInfo != null) {
-      val metrics = new TaskMetrics
+      val metrics = TaskMetrics.empty
       val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
         logWarning("Task start for unknown stage " + taskStart.stageId)
         new StageUIData
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6c50c72a91ef2dc132ec97337e9964099ca5d4b2..a613fbc5cc3b27fd11dd1123a121fe2884c20cef 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -750,10 +750,10 @@ private[spark] object JsonProtocol {
   }
 
   def taskMetricsFromJson(json: JValue): TaskMetrics = {
+    val metrics = TaskMetrics.empty
     if (json == JNothing) {
-      return TaskMetrics.empty
+      return metrics
     }
-    val metrics = new TaskMetrics
     metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
     metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
     metrics.setResultSize((json \ "Result Size").extract[Long])
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 454c42517ca1ba1cfa82a3e2c4990c03ed79830f..6063476936c7f288f9d5a10d5a403249e2aa5e8a 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -176,11 +176,10 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
   test("get accum") {
     sc = new SparkContext("local", "test")
     // Don't register with SparkContext for cleanup
-    var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+    var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
     val accId = acc.id
     val ref = WeakReference(acc)
     assert(ref.get.isDefined)
-    Accumulators.register(ref.get.get)
 
     // Remove the explicit reference to it and allow weak reference to get garbage collected
     acc = null
@@ -194,30 +193,19 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
 
     // Getting a normal accumulator. Note: this has to be separate because referencing an
     // accumulator above in an `assert` would keep it from being garbage collected.
-    val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true, true)
-    Accumulators.register(acc2)
+    val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true)
     assert(Accumulators.get(acc2.id) === Some(acc2))
 
     // Getting an accumulator that does not exist should return None
     assert(Accumulators.get(100000).isEmpty)
   }
 
-  test("only external accums are automatically registered") {
-    val accEx = new Accumulator(0, IntAccumulatorParam, Some("external"), internal = false)
-    val accIn = new Accumulator(0, IntAccumulatorParam, Some("internal"), internal = true)
-    assert(!accEx.isInternal)
-    assert(accIn.isInternal)
-    assert(Accumulators.get(accEx.id).isDefined)
-    assert(Accumulators.get(accIn.id).isEmpty)
-  }
-
   test("copy") {
-    val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), true, false)
+    val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), false)
     val acc2 = acc1.copy()
     assert(acc1.id === acc2.id)
     assert(acc1.value === acc2.value)
     assert(acc1.name === acc2.name)
-    assert(acc1.isInternal === acc2.isInternal)
     assert(acc1.countFailedValues === acc2.countFailedValues)
     assert(acc1 !== acc2)
     // Modifying one does not affect the other
@@ -230,15 +218,11 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
   }
 
   test("register multiple accums with same ID") {
-    // Make sure these are internal accums so we don't automatically register them already
-    val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+    val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
+    // `copy` will create a new Accumulable and register it.
     val acc2 = acc1.copy()
     assert(acc1 !== acc2)
     assert(acc1.id === acc2.id)
-    assert(Accumulators.originals.isEmpty)
-    assert(Accumulators.get(acc1.id).isEmpty)
-    Accumulators.register(acc1)
-    Accumulators.register(acc2)
     // The second one does not override the first one
     assert(Accumulators.originals.size === 1)
     assert(Accumulators.get(acc1.id) === Some(acc1))
@@ -275,14 +259,14 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
   }
 
   test("value is reset on the executors") {
-    val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
-    val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
+    val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"))
+    val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"))
     val externalAccums = Seq(acc1, acc2)
     val taskMetrics = new TaskMetrics
     // Set some values; these should not be observed later on the "executors"
     acc1.setValue(10)
     acc2.setValue(20L)
-    taskMetrics.testAccum.get.asInstanceOf[Accumulator[Long]].setValue(30L)
+    taskMetrics.testAccum.get.setValue(30L)
     // Simulate the task being serialized and sent to the executors.
     val dummyTask = new DummyTask(taskMetrics, externalAccums)
     val serInstance = new JavaSerializer(new SparkConf).newInstance()
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index fbc2fae08df24bfcca5904df50ca52b09f2d693c..ee70419727e861645545ab966836e7597d80ece3 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -178,11 +178,11 @@ class TaskMetricsSuite extends SparkFunSuite {
     val sr1 = tm.createTempShuffleReadMetrics()
     val sr2 = tm.createTempShuffleReadMetrics()
     val sr3 = tm.createTempShuffleReadMetrics()
-    sr1.setRecordsRead(10L)
-    sr2.setRecordsRead(10L)
-    sr1.setFetchWaitTime(1L)
-    sr2.setFetchWaitTime(2L)
-    sr3.setFetchWaitTime(3L)
+    sr1.incRecordsRead(10L)
+    sr2.incRecordsRead(10L)
+    sr1.incFetchWaitTime(1L)
+    sr2.incFetchWaitTime(2L)
+    sr3.incFetchWaitTime(3L)
     tm.mergeShuffleReadMetrics()
     assert(tm.shuffleReadMetrics.remoteBlocksFetched === 0L)
     assert(tm.shuffleReadMetrics.recordsRead === 20L)
@@ -198,8 +198,7 @@ class TaskMetricsSuite extends SparkFunSuite {
     val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
     val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b"))
     val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c"))
-    val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"),
-      internal = true, countFailedValues = true)
+    val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), countFailedValues = true)
     tm.registerAccumulator(acc1)
     tm.registerAccumulator(acc2)
     tm.registerAccumulator(acc3)
@@ -219,9 +218,7 @@ class TaskMetricsSuite extends SparkFunSuite {
     assert(newUpdates(acc2.id).update === Some(2))
     assert(newUpdates(acc3.id).update === Some(0))
     assert(newUpdates(acc4.id).update === Some(0))
-    assert(!newUpdates(acc3.id).internal)
     assert(!newUpdates(acc3.id).countFailedValues)
-    assert(newUpdates(acc4.id).internal)
     assert(newUpdates(acc4.id).countFailedValues)
     assert(newUpdates.values.map(_.update).forall(_.isDefined))
     assert(newUpdates.values.map(_.value).forall(_.isEmpty))
@@ -230,7 +227,7 @@ class TaskMetricsSuite extends SparkFunSuite {
 
   test("from accumulator updates") {
     val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a =>
-      AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues)
+      AccumulableInfo(a.id, a.name, Some(3L), None, true, a.countFailedValues)
     }
     val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
     assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1)
@@ -239,16 +236,15 @@ class TaskMetricsSuite extends SparkFunSuite {
     // on the driver, internal or not, should be registered with `Accumulators` at some point.
     val param = IntAccumulatorParam
     val registeredAccums = Seq(
-      new Accumulator(0, param, Some("a"), internal = true, countFailedValues = true),
-      new Accumulator(0, param, Some("b"), internal = true, countFailedValues = false),
-      new Accumulator(0, param, Some("c"), internal = false, countFailedValues = true),
-      new Accumulator(0, param, Some("d"), internal = false, countFailedValues = false))
+      new Accumulator(0, param, Some("a"), countFailedValues = true),
+      new Accumulator(0, param, Some("b"), countFailedValues = false))
     val unregisteredAccums = Seq(
-      new Accumulator(0, param, Some("e"), internal = true, countFailedValues = true),
-      new Accumulator(0, param, Some("f"), internal = true, countFailedValues = false))
+      new Accumulator(0, param, Some("c"), countFailedValues = true),
+      new Accumulator(0, param, Some("d"), countFailedValues = false))
     registeredAccums.foreach(Accumulators.register)
-    registeredAccums.foreach { a => assert(Accumulators.originals.contains(a.id)) }
-    unregisteredAccums.foreach { a => assert(!Accumulators.originals.contains(a.id)) }
+    registeredAccums.foreach(a => assert(Accumulators.originals.contains(a.id)))
+    unregisteredAccums.foreach(a => Accumulators.remove(a.id))
+    unregisteredAccums.foreach(a => assert(!Accumulators.originals.contains(a.id)))
     // set some values in these accums
     registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
     unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
@@ -276,7 +272,6 @@ private[spark] object TaskMetricsSuite extends Assertions {
       assert(info1.name === info2.name)
       assert(info1.update === info2.update)
       assert(info1.value === info2.value)
-      assert(info1.internal === info2.internal)
       assert(info1.countFailedValues === info2.countFailedValues)
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index bda4c996b27df6565a09e4202c2d671d8d923003..d55f6f60ece8692260fe187c3d1d24a96da574e2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -148,8 +148,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
     sc = new SparkContext("local[1,4]", "test")
     val param = AccumulatorParam.LongAccumulatorParam
     // Create 2 accumulators, one that counts failed values and another that doesn't
-    val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
-    val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
+    val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
+    val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
     // Fail first 3 attempts of every task. This means each task should be run 4 times.
     sc.parallelize(1 to 10, 10).map { i =>
       acc1 += 1
@@ -169,8 +169,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
   test("failed tasks collect only accumulators whose values count during failures") {
     sc = new SparkContext("local", "test")
     val param = AccumulatorParam.LongAccumulatorParam
-    val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
-    val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
+    val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
+    val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
     // Create a dummy task. We won't end up running this; we just want to collect
     // accumulator updates from it.
     val taskMetrics = new TaskMetrics
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 7fa13907295b4c49175bd3281d0314dfae8b163f..92c31eac9594665fe02548322f44f27990a494f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -29,12 +29,11 @@ import org.apache.spark.util.Utils
  */
 private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T](
     name: String,
-    val param: SQLMetricParam[R, T])
-  extends Accumulable[R, T](param.zero, param, Some(name), internal = true) {
+    val param: SQLMetricParam[R, T]) extends Accumulable[R, T](param.zero, param, Some(name)) {
 
   // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
   override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
-    new AccumulableInfo(id, Some(name), update, value, isInternal, countFailedValues,
+    new AccumulableInfo(id, Some(name), update, value, true, countFailedValues,
       Some(SQLMetrics.ACCUM_IDENTIFIER))
   }