diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b86cfbfa48fbea9d5cddf000700ccb8d361a4686..34131984570e42acb74792bb0151642e9d3cfc43 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -164,7 +164,7 @@ class DAGScheduler(
    */
   def executorHeartbeatReceived(
       execId: String,
-      taskMetrics: Array[(Long, Int, TaskMetrics)], // (taskId, stageId, metrics)
+      taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics)
       blockManagerId: BlockManagerId): Boolean = {
     listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
     implicit val timeout = Timeout(600 seconds)
@@ -677,7 +677,10 @@ class DAGScheduler(
   }
 
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
-    listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
+    // Note that there is a chance that this task is launched after the stage is cancelled.
+    // In that case, we wouldn't have the stage anymore in stageIdToStage.
+    val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
+    listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
     submitWaitingStages()
   }
 
@@ -695,8 +698,8 @@ class DAGScheduler(
       // is in the process of getting stopped.
       val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
       runningStages.foreach { stage =>
-        stage.info.stageFailed(stageFailedMessage)
-        listenerBus.post(SparkListenerStageCompleted(stage.info))
+        stage.latestInfo.stageFailed(stageFailedMessage)
+        listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
       }
       listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
     }
@@ -781,7 +784,16 @@ class DAGScheduler(
     logDebug("submitMissingTasks(" + stage + ")")
     // Get our pending tasks and remember them in our pendingTasks entry
     stage.pendingTasks.clear()
-    var tasks = ArrayBuffer[Task[_]]()
+
+    // First figure out the indexes of partition ids to compute.
+    val partitionsToCompute: Seq[Int] = {
+      if (stage.isShuffleMap) {
+        (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
+      } else {
+        val job = stage.resultOfJob.get
+        (0 until job.numPartitions).filter(id => !job.finished(id))
+      }
+    }
 
     val properties = if (jobIdToActiveJob.contains(jobId)) {
       jobIdToActiveJob(stage.jobId).properties
@@ -795,7 +807,8 @@ class DAGScheduler(
     // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
     // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
     // event.
-    listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))
+    stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
+    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
 
     // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
     // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
@@ -826,20 +839,19 @@ class DAGScheduler(
         return
     }
 
-    if (stage.isShuffleMap) {
-      for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
-        val locs = getPreferredLocs(stage.rdd, p)
-        val part = stage.rdd.partitions(p)
-        tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs)
+    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
+      partitionsToCompute.map { id =>
+        val locs = getPreferredLocs(stage.rdd, id)
+        val part = stage.rdd.partitions(id)
+        new ShuffleMapTask(stage.id, taskBinary, part, locs)
       }
     } else {
-      // This is a final stage; figure out its job's missing partitions
       val job = stage.resultOfJob.get
-      for (id <- 0 until job.numPartitions if !job.finished(id)) {
+      partitionsToCompute.map { id =>
         val p: Int = job.partitions(id)
         val part = stage.rdd.partitions(p)
         val locs = getPreferredLocs(stage.rdd, p)
-        tasks += new ResultTask(stage.id, taskBinary, part, locs, id)
+        new ResultTask(stage.id, taskBinary, part, locs, id)
       }
     }
 
@@ -869,11 +881,11 @@ class DAGScheduler(
       logDebug("New pending tasks: " + stage.pendingTasks)
       taskScheduler.submitTasks(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
-      stage.info.submissionTime = Some(clock.getTime())
+      stage.latestInfo.submissionTime = Some(clock.getTime())
     } else {
       // Because we posted SparkListenerStageSubmitted earlier, we should post
       // SparkListenerStageCompleted here in case there are no tasks to run.
-      listenerBus.post(SparkListenerStageCompleted(stage.info))
+      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
       logDebug("Stage " + stage + " is actually done; %b %d %d".format(
         stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
       runningStages -= stage
@@ -892,8 +904,9 @@ class DAGScheduler(
     // The success case is dealt with separately below, since we need to compute accumulator
     // updates before posting.
     if (event.reason != Success) {
-      listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
-        event.taskMetrics))
+      val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
+      listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,
+        event.taskInfo, event.taskMetrics))
     }
 
     if (!stageIdToStage.contains(task.stageId)) {
@@ -902,14 +915,19 @@ class DAGScheduler(
     }
     val stage = stageIdToStage(task.stageId)
 
-    def markStageAsFinished(stage: Stage) = {
-      val serviceTime = stage.info.submissionTime match {
+    def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
+      val serviceTime = stage.latestInfo.submissionTime match {
         case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
         case _ => "Unknown"
       }
-      logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
-      stage.info.completionTime = Some(clock.getTime())
-      listenerBus.post(SparkListenerStageCompleted(stage.info))
+      if (errorMessage.isEmpty) {
+        logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
+        stage.latestInfo.completionTime = Some(clock.getTime())
+      } else {
+        stage.latestInfo.stageFailed(errorMessage.get)
+        logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
+      }
+      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
       runningStages -= stage
     }
     event.reason match {
@@ -924,7 +942,7 @@ class DAGScheduler(
                 val name = acc.name.get
                 val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
                 val stringValue = Accumulators.stringifyValue(acc.value)
-                stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue)
+                stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
                 event.taskInfo.accumulables +=
                   AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
               }
@@ -935,8 +953,8 @@ class DAGScheduler(
               logError(s"Failed to update accumulators for $task", e)
           }
         }
-        listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
-          event.taskMetrics))
+        listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
+          event.reason, event.taskInfo, event.taskMetrics))
         stage.pendingTasks -= task
         task match {
           case rt: ResultTask[_, _] =>
@@ -1029,6 +1047,7 @@ class DAGScheduler(
       case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
         // Mark the stage that the reducer was in as unrunnable
         val failedStage = stageIdToStage(task.stageId)
+        markStageAsFinished(failedStage, Some("Fetch failure"))
         runningStages -= failedStage
         // TODO: Cancel running tasks in the stage
         logInfo("Marking " + failedStage + " (" + failedStage.name +
@@ -1142,7 +1161,7 @@ class DAGScheduler(
     }
     val dependentJobs: Seq[ActiveJob] =
       activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
-    failedStage.info.completionTime = Some(clock.getTime())
+    failedStage.latestInfo.completionTime = Some(clock.getTime())
     for (job <- dependentJobs) {
       failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
     }
@@ -1182,8 +1201,8 @@ class DAGScheduler(
           if (runningStages.contains(stage)) {
             try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
               taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-              stage.info.stageFailed(failureReason)
-              listenerBus.post(SparkListenerStageCompleted(stage.info))
+              stage.latestInfo.stageFailed(failureReason)
+              listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
             } catch {
               case e: UnsupportedOperationException =>
                 logInfo(s"Could not cancel tasks for stage $stageId", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d01d31863387768c04087fa931a18bc364bb6b12..86ca8445a112458b6a9d1c94907ae8bdb5ae0367 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -39,7 +39,8 @@ case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Propert
 case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends SparkListenerEvent
+case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
+  extends SparkListenerEvent
 
 @DeveloperApi
 case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
@@ -47,6 +48,7 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
 @DeveloperApi
 case class SparkListenerTaskEnd(
     stageId: Int,
+    stageAttemptId: Int,
     taskType: String,
     reason: TaskEndReason,
     taskInfo: TaskInfo,
@@ -75,10 +77,15 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
 @DeveloperApi
 case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
 
+/**
+ * Periodic updates from executors.
+ * @param execId executor id
+ * @param taskMetrics sequence of (task id, stage id, stage attempt, metrics)
+ */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
     execId: String,
-    taskMetrics: Seq[(Long, Int, TaskMetrics)])
+    taskMetrics: Seq[(Long, Int, Int, TaskMetrics)])
   extends SparkListenerEvent
 
 @DeveloperApi
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 800905413d1457acab380222fb7870ec02a994cd..071568cdfb429ccd609eefc540e2ace65db43f56 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -43,6 +43,9 @@ import org.apache.spark.util.CallSite
  * stage, the callSite gives the user code that created the RDD being shuffled. For a result
  * stage, the callSite gives the user code that executes the associated action (e.g. count()).
  *
+ * A single stage can consist of multiple attempts. In that case, the latestInfo field will
+ * be updated for each attempt.
+ *
  */
 private[spark] class Stage(
     val id: Int,
@@ -71,8 +74,8 @@ private[spark] class Stage(
   val name = callSite.shortForm
   val details = callSite.longForm
 
-  /** Pointer to the [StageInfo] object, set by DAGScheduler. */
-  var info: StageInfo = StageInfo.fromStage(this)
+  /** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
+  var latestInfo: StageInfo = StageInfo.fromStage(this)
 
   def isAvailable: Boolean = {
     if (!isShuffleMap) {
@@ -116,6 +119,7 @@ private[spark] class Stage(
     }
   }
 
+  /** Return a new attempt id, starting with 0. */
   def newAttemptId(): Int = {
     val id = nextAttemptId
     nextAttemptId += 1
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 2a407e47a05bd2c8212176b4b19e5138321a2dda..c6dc3369ba5ccc2dc4fe8f152ccd6aa012ae671d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -29,6 +29,7 @@ import org.apache.spark.storage.RDDInfo
 @DeveloperApi
 class StageInfo(
     val stageId: Int,
+    val attemptId: Int,
     val name: String,
     val numTasks: Int,
     val rddInfos: Seq[RDDInfo],
@@ -56,9 +57,15 @@ private[spark] object StageInfo {
    * shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
    * sequence of narrow dependencies should also be associated with this Stage.
    */
-  def fromStage(stage: Stage): StageInfo = {
+  def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = {
     val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
     val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
-    new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details)
+    new StageInfo(
+      stage.id,
+      stage.attemptId,
+      stage.name,
+      numTasks.getOrElse(stage.numTasks),
+      rddInfos,
+      stage.details)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 6c0d1b2752a81b7dcd4793598a9690590f4e7dcf..ad051e59af86db3508cc81a0ab4c957b20565e3d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -333,12 +333,12 @@ private[spark] class TaskSchedulerImpl(
       execId: String,
       taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
       blockManagerId: BlockManagerId): Boolean = {
-    val metricsWithStageIds = taskMetrics.flatMap {
-      case (id, metrics) => {
+
+    val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {
+      taskMetrics.flatMap { case (id, metrics) =>
         taskIdToTaskSetId.get(id)
           .flatMap(activeTaskSets.get)
-          .map(_.stageId)
-          .map(x => (id, x, metrics))
+          .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))
       }
     }
     dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 613fa7850bb2567699958299f34a7c984872340e..c3ad325156f53786224a58fd48e50697c9c94344 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -31,9 +31,5 @@ private[spark] class TaskSet(
     val properties: Properties) {
     val id: String = stageId + "." + attempt
 
-  def kill(interruptThread: Boolean) {
-    tasks.foreach(_.kill(interruptThread))
-  }
-
   override def toString: String = "TaskSet " + id
 }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 0cc51c873727d9c7b193e0f83374fa2a8960f73c..2987dc04494a512e86d520404aa648b99fbce699 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -24,8 +24,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils}
 import org.apache.spark.ui.jobs.UIData.StageUIData
 import org.apache.spark.util.Utils
 
-/** Page showing executor summary */
-private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
+/** Stage summary grouped by executors. */
+private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobProgressTab) {
   private val listener = parent.listener
 
   def toNodeSeq: Seq[Node] = {
@@ -65,7 +65,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
       executorIdToAddress.put(executorId, address)
     }
 
-    listener.stageIdToData.get(stageId) match {
+    listener.stageIdToData.get((stageId, stageAttemptId)) match {
       case Some(stageData: StageUIData) =>
         stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
           <tr>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 74cd637d881557eadf58f1d10ae8db7773113ff2..f7f918fd521a9a5e5bceda0f35f7ff38d08b204e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -43,12 +43,16 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   // How many stages to remember
   val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
 
-  val activeStages = HashMap[Int, StageInfo]()
+  // Map from stageId to StageInfo
+  val activeStages = new HashMap[Int, StageInfo]
+
+  // Map from (stageId, attemptId) to StageUIData
+  val stageIdToData = new HashMap[(Int, Int), StageUIData]
+
   val completedStages = ListBuffer[StageInfo]()
   val failedStages = ListBuffer[StageInfo]()
 
-  val stageIdToData = new HashMap[Int, StageUIData]
-
+  // Map from pool name to a hash map (map from stage id to StageInfo).
   val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
 
   val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
@@ -59,9 +63,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
 
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
     val stage = stageCompleted.stageInfo
-    val stageId = stage.stageId
-    val stageData = stageIdToData.getOrElseUpdate(stageId, {
-      logWarning("Stage completed for unknown stage " + stageId)
+    val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
+      logWarning("Stage completed for unknown stage " + stage.stageId)
       new StageUIData
     })
 
@@ -69,8 +72,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       stageData.accumulables(id) = info
     }
 
-    poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
-    activeStages.remove(stageId)
+    poolToActiveStages.get(stageData.schedulingPool).foreach { hashMap =>
+      hashMap.remove(stage.stageId)
+    }
+    activeStages.remove(stage.stageId)
     if (stage.failureReason.isEmpty) {
       completedStages += stage
       trimIfNecessary(completedStages)
@@ -84,7 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > retainedStages) {
       val toRemove = math.max(retainedStages / 10, 1)
-      stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) }
+      stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) }
       stages.trimStart(toRemove)
     }
   }
@@ -98,21 +103,21 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
     }.getOrElse(DEFAULT_POOL_NAME)
 
-    val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData)
+    val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
     stageData.schedulingPool = poolName
 
     stageData.description = Option(stageSubmitted.properties).flatMap {
       p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
     }
 
-    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]())
+    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo])
     stages(stage.stageId) = stage
   }
 
   override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
     val taskInfo = taskStart.taskInfo
     if (taskInfo != null) {
-      val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, {
+      val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
         logWarning("Task start for unknown stage " + taskStart.stageId)
         new StageUIData
       })
@@ -128,8 +133,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
     val info = taskEnd.taskInfo
-    if (info != null) {
-      val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
+    // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
+    // compeletion event is for. Let's just drop it here. This means we might have some speculation
+    // tasks on the web ui that's never marked as complete.
+    if (info != null && taskEnd.stageAttemptId != -1) {
+      val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {
         logWarning("Task end for unknown stage " + taskEnd.stageId)
         new StageUIData
       })
@@ -222,8 +230,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   }
 
   override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) {
-    for ((taskId, sid, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
-      val stageData = stageIdToData.getOrElseUpdate(sid, {
+    for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
+      val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), {
         logWarning("Metrics update for task in unknown stage " + sid)
         new StageUIData
       })
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index d4eb02722ad123f83c4470f7f2570ede6692d63e..db01be596e0731f2aba7b0fe29e14cfcb2df4017 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -34,7 +34,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
       val stageId = request.getParameter("id").toInt
-      val stageDataOption = listener.stageIdToData.get(stageId)
+      val stageAttemptId = request.getParameter("attempt").toInt
+      val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId))
 
       if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
         val content =
@@ -42,14 +43,15 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
             <h4>Summary Metrics</h4> No tasks have started yet
             <h4>Tasks</h4> No tasks have started yet
           </div>
-        return UIUtils.headerSparkPage("Details for Stage %s".format(stageId), content, parent)
+        return UIUtils.headerSparkPage(
+          s"Details for Stage $stageId (Attempt $stageAttemptId)", content, parent)
       }
 
       val stageData = stageDataOption.get
       val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
 
       val numCompleted = tasks.count(_.taskInfo.finished)
-      val accumulables = listener.stageIdToData(stageId).accumulables
+      val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables
       val hasInput = stageData.inputBytes > 0
       val hasShuffleRead = stageData.shuffleReadBytes > 0
       val hasShuffleWrite = stageData.shuffleWriteBytes > 0
@@ -211,7 +213,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
           def quantileRow(data: Seq[Node]): Seq[Node] = <tr>{data}</tr>
           Some(UIUtils.listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
         }
-      val executorTable = new ExecutorTable(stageId, parent)
+
+      val executorTable = new ExecutorTable(stageId, stageAttemptId, parent)
 
       val maybeAccumulableTable: Seq[Node] =
         if (accumulables.size > 0) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 16ad0df45aa0da914089b34c59195aed833cfe3d..2e67310594784f06b3587184977114afadce1278 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -97,8 +97,8 @@ private[ui] class StageTableBase(
     }
     // scalastyle:on
 
-    val nameLinkUri ="%s/stages/stage?id=%s"
-      .format(UIUtils.prependBaseUri(parent.basePath), s.stageId)
+    val nameLinkUri ="%s/stages/stage?id=%s&attempt=%s"
+      .format(UIUtils.prependBaseUri(parent.basePath), s.stageId, s.attemptId)
     val nameLink = <a href={nameLinkUri}>{s.name}</a>
 
     val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0)
@@ -121,7 +121,7 @@ private[ui] class StageTableBase(
     }
 
     val stageDesc = for {
-      stageData <- listener.stageIdToData.get(s.stageId)
+      stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
       desc <- stageData.description
     } yield {
       <div><em>{desc}</em></div>
@@ -131,7 +131,7 @@ private[ui] class StageTableBase(
   }
 
   protected def stageRow(s: StageInfo): Seq[Node] = {
-    val stageDataOption = listener.stageIdToData.get(s.stageId)
+    val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId))
     if (stageDataOption.isEmpty) {
       return <td>{s.stageId}</td><td>No data available for this stage</td>
     }
@@ -154,7 +154,11 @@ private[ui] class StageTableBase(
     val shuffleWrite = stageData.shuffleWriteBytes
     val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
 
-    <td>{s.stageId}</td> ++
+    {if (s.attemptId > 0) {
+      <td>{s.stageId} (retry {s.attemptId})</td>
+    } else {
+      <td>{s.stageId}</td>
+    }} ++
     {if (isFairScheduler) {
       <td>
         <a href={"%s/stages/pool?poolname=%s"
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1e18ec688c40d1330a024675fb7359eabaf756e7..db7384705fc1b33c2a4ce239c1502cc4d52f46c1 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -96,6 +96,7 @@ private[spark] object JsonProtocol {
     val taskInfo = taskStart.taskInfo
     ("Event" -> Utils.getFormattedClassName(taskStart)) ~
     ("Stage ID" -> taskStart.stageId) ~
+    ("Stage Attempt ID" -> taskStart.stageAttemptId) ~
     ("Task Info" -> taskInfoToJson(taskInfo))
   }
 
@@ -112,6 +113,7 @@ private[spark] object JsonProtocol {
     val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
     ("Event" -> Utils.getFormattedClassName(taskEnd)) ~
     ("Stage ID" -> taskEnd.stageId) ~
+    ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
     ("Task Type" -> taskEnd.taskType) ~
     ("Task End Reason" -> taskEndReason) ~
     ("Task Info" -> taskInfoToJson(taskInfo)) ~
@@ -187,6 +189,7 @@ private[spark] object JsonProtocol {
     val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
     val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
     ("Stage ID" -> stageInfo.stageId) ~
+    ("Stage Attempt ID" -> stageInfo.attemptId) ~
     ("Stage Name" -> stageInfo.name) ~
     ("Number of Tasks" -> stageInfo.numTasks) ~
     ("RDD Info" -> rddInfo) ~
@@ -419,8 +422,9 @@ private[spark] object JsonProtocol {
 
   def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
     val stageId = (json \ "Stage ID").extract[Int]
+    val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
     val taskInfo = taskInfoFromJson(json \ "Task Info")
-    SparkListenerTaskStart(stageId, taskInfo)
+    SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
   }
 
   def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult = {
@@ -430,11 +434,12 @@ private[spark] object JsonProtocol {
 
   def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
     val stageId = (json \ "Stage ID").extract[Int]
+    val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
     val taskType = (json \ "Task Type").extract[String]
     val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
     val taskInfo = taskInfoFromJson(json \ "Task Info")
     val taskMetrics = taskMetricsFromJson(json \ "Task Metrics")
-    SparkListenerTaskEnd(stageId, taskType, taskEndReason, taskInfo, taskMetrics)
+    SparkListenerTaskEnd(stageId, stageAttemptId, taskType, taskEndReason, taskInfo, taskMetrics)
   }
 
   def jobStartFromJson(json: JValue): SparkListenerJobStart = {
@@ -492,6 +497,7 @@ private[spark] object JsonProtocol {
 
   def stageInfoFromJson(json: JValue): StageInfo = {
     val stageId = (json \ "Stage ID").extract[Int]
+    val attemptId = (json \ "Attempt ID").extractOpt[Int].getOrElse(0)
     val stageName = (json \ "Stage Name").extract[String]
     val numTasks = (json \ "Number of Tasks").extract[Int]
     val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson(_))
@@ -504,7 +510,7 @@ private[spark] object JsonProtocol {
       case None => Seq[AccumulableInfo]()
     }
 
-    val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
+    val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details)
     stageInfo.submissionTime = submissionTime
     stageInfo.completionTime = completionTime
     stageInfo.failureReason = failureReason
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 51fb646a3cb617bf843c806d3fd35ed2d6835b6b..7671cb969a26b1e0d554b5efc57a73f55668246d 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -69,10 +69,10 @@ class StorageStatusListenerSuite extends FunSuite {
     // Task end with no updated blocks
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
   }
@@ -92,13 +92,13 @@ class StorageStatusListenerSuite extends FunSuite {
     // Task end with new blocks
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
     assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
     assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
     assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
@@ -111,13 +111,14 @@ class StorageStatusListenerSuite extends FunSuite {
     val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
     taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
     taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
+
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
     assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
     assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
     assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
     assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
@@ -135,8 +136,8 @@ class StorageStatusListenerSuite extends FunSuite {
     val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
     taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
     taskMetrics2.updatedBlocks = Some(Seq(block3))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
-    listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics2))
     assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
 
     // Unpersist RDD
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 147ec0bc52e39c905549500ee4231ddb620560ba..3370dd4156c3fde18088d00e6ae358e83c198cac 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -34,12 +34,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     val listener = new JobProgressListener(conf)
 
     def createStageStartEvent(stageId: Int) = {
-      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
+      val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
       SparkListenerStageSubmitted(stageInfo)
     }
 
     def createStageEndEvent(stageId: Int) = {
-      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
+      val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
       SparkListenerStageCompleted(stageInfo)
     }
 
@@ -70,33 +70,37 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     taskInfo.finishTime = 1
     var task = new ShuffleMapTask(0)
     val taskType = Utils.getFormattedClassName(task)
-    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
-    assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
-      .shuffleRead === 1000)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+    assert(listener.stageIdToData.getOrElse((0, 0), fail())
+      .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 1000)
 
     // finish a task with unknown executor-id, nothing should happen
     taskInfo =
       new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0)
-    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
     assert(listener.stageIdToData.size === 1)
 
     // finish this task, should get updated duration
     taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0)
-    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
-    assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
-      .shuffleRead === 2000)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+    assert(listener.stageIdToData.getOrElse((0, 0), fail())
+      .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 2000)
 
     // finish this task, should get updated duration
     taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0)
-    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
-    assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail())
-      .shuffleRead === 1000)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+    assert(listener.stageIdToData.getOrElse((0, 0), fail())
+      .executorSummary.getOrElse("exe-2", fail()).shuffleRead === 1000)
   }
 
   test("test task success vs failure counting for different task end reasons") {
@@ -119,16 +123,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
       UnknownReason)
     var failCount = 0
     for (reason <- taskFailedReasons) {
-      listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
+      listener.onTaskEnd(
+        SparkListenerTaskEnd(task.stageId, 0, taskType, reason, taskInfo, metrics))
       failCount += 1
-      assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0)
-      assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
+      assert(listener.stageIdToData((task.stageId, 0)).numCompleteTasks === 0)
+      assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount)
     }
 
     // Make sure we count success as success.
-    listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
-    assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1)
-    assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(task.stageId, 1, taskType, Success, taskInfo, metrics))
+    assert(listener.stageIdToData((task.stageId, 1)).numCompleteTasks === 1)
+    assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount)
   }
 
   test("test update metrics") {
@@ -163,18 +169,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
       taskInfo
     }
 
-    listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1234L)))
-    listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1235L)))
-    listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1236L)))
-    listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1237L)))
+    listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1234L)))
+    listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1235L)))
+    listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L)))
+    listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
 
     listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
-      (1234L, 0, makeTaskMetrics(0)),
-      (1235L, 0, makeTaskMetrics(100)),
-      (1236L, 1, makeTaskMetrics(200)))))
+      (1234L, 0, 0, makeTaskMetrics(0)),
+      (1235L, 0, 0, makeTaskMetrics(100)),
+      (1236L, 1, 0, makeTaskMetrics(200)))))
 
-    var stage0Data = listener.stageIdToData.get(0).get
-    var stage1Data = listener.stageIdToData.get(1).get
+    var stage0Data = listener.stageIdToData.get((0, 0)).get
+    var stage1Data = listener.stageIdToData.get((1, 0)).get
     assert(stage0Data.shuffleReadBytes == 102)
     assert(stage1Data.shuffleReadBytes == 201)
     assert(stage0Data.shuffleWriteBytes == 106)
@@ -195,14 +201,14 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
       .totalBlocksFetched == 202)
 
     // task that was included in a heartbeat
-    listener.onTaskEnd(SparkListenerTaskEnd(0, taskType, Success, makeTaskInfo(1234L, 1),
+    listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1),
       makeTaskMetrics(300)))
     // task that wasn't included in a heartbeat
-    listener.onTaskEnd(SparkListenerTaskEnd(1, taskType, Success, makeTaskInfo(1237L, 1),
+    listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1),
       makeTaskMetrics(400)))
 
-    stage0Data = listener.stageIdToData.get(0).get
-    stage1Data = listener.stageIdToData.get(1).get
+    stage0Data = listener.stageIdToData.get((0, 0)).get
+    stage1Data = listener.stageIdToData.get((1, 0)).get
     assert(stage0Data.shuffleReadBytes == 402)
     assert(stage1Data.shuffleReadBytes == 602)
     assert(stage0Data.shuffleWriteBytes == 406)
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 6e68dcb3425aa09c84dfc47fb53132e1462f91fb..b860177705d8415feadad1cc06c0667f58bf973c 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -53,7 +53,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     assert(storageListener.rddInfoList.isEmpty)
 
     // 2 RDDs are known, but none are cached
-    val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0, rddInfo1), "details")
+    val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), "details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 2)
     assert(storageListener.rddInfoList.isEmpty)
@@ -63,7 +63,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val rddInfo3Cached = rddInfo3
     rddInfo2Cached.numCachedPartitions = 1
     rddInfo3Cached.numCachedPartitions = 1
-    val stageInfo1 = new StageInfo(1, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details")
+    val stageInfo1 = new StageInfo(1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
     assert(storageListener._rddInfoMap.size === 4)
     assert(storageListener.rddInfoList.size === 2)
@@ -71,7 +71,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     // Submitting RDDInfos with duplicate IDs does nothing
     val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY)
     rddInfo0Cached.numCachedPartitions = 1
-    val stageInfo0Cached = new StageInfo(0, "0", 100, Seq(rddInfo0), "details")
+    val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), "details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached))
     assert(storageListener._rddInfoMap.size === 4)
     assert(storageListener.rddInfoList.size === 2)
@@ -87,7 +87,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val rddInfo1Cached = rddInfo1
     rddInfo0Cached.numCachedPartitions = 1
     rddInfo1Cached.numCachedPartitions = 1
-    val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details")
+    val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details")
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 2)
     assert(storageListener.rddInfoList.size === 2)
@@ -106,7 +106,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val myRddInfo0 = rddInfo0
     val myRddInfo1 = rddInfo1
     val myRddInfo2 = rddInfo2
-    val stageInfo0 = new StageInfo(0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
+    val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
     bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 3)
@@ -116,7 +116,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     assert(!storageListener._rddInfoMap(2).isCached)
 
     // Task end with no updated blocks. This should not change anything.
-    bus.postToAll(SparkListenerTaskEnd(0, "obliteration", Success, taskInfo, new TaskMetrics))
+    bus.postToAll(SparkListenerTaskEnd(0, 0, "obliteration", Success, taskInfo, new TaskMetrics))
     assert(storageListener._rddInfoMap.size === 3)
     assert(storageListener.rddInfoList.size === 0)
 
@@ -128,7 +128,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
       (RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)),
       (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L))
     ))
-    bus.postToAll(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo, metrics1))
+    bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
     assert(storageListener._rddInfoMap(0).memSize === 800L)
     assert(storageListener._rddInfoMap(0).diskSize === 400L)
     assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
@@ -150,7 +150,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
       (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist
       (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist
     ))
-    bus.postToAll(SparkListenerTaskEnd(2, "obliteration", Success, taskInfo, metrics2))
+    bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
     assert(storageListener._rddInfoMap(0).memSize === 400L)
     assert(storageListener._rddInfoMap(0).diskSize === 400L)
     assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 97ffb07662482c54101ca8bd9322f4f75d9b0d2d..2fd3b9cfd221ace4fdaa193459f7862a25ea9e28 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -35,13 +35,13 @@ class JsonProtocolSuite extends FunSuite {
     val stageSubmitted =
       SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
     val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L))
-    val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false))
+    val taskStart = SparkListenerTaskStart(111, 0, makeTaskInfo(222L, 333, 1, 444L, false))
     val taskGettingResult =
       SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
-    val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+    val taskEnd = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
       makeTaskInfo(123L, 234, 67, 345L, false),
       makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false))
-    val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+    val taskEndWithHadoopInput = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
       makeTaskInfo(123L, 234, 67, 345L, false),
       makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
     val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
@@ -397,7 +397,8 @@ class JsonProtocolSuite extends FunSuite {
 
   private def assertJsonStringEquals(json1: String, json2: String) {
     val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "")
-    assert(formatJsonString(json1) === formatJsonString(json2))
+    assert(formatJsonString(json1) === formatJsonString(json2),
+      s"input ${formatJsonString(json1)} got ${formatJsonString(json2)}")
   }
 
   private def assertSeqEquals[T](seq1: Seq[T], seq2: Seq[T], assertEquals: (T, T) => Unit) {
@@ -485,7 +486,7 @@ class JsonProtocolSuite extends FunSuite {
 
   private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
     val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
-    val stageInfo = new StageInfo(a, "greetings", b, rddInfos, "details")
+    val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, "details")
     val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
     stageInfo.accumulables(acc1.id) = acc1
     stageInfo.accumulables(acc2.id) = acc2
@@ -558,84 +559,246 @@ class JsonProtocolSuite extends FunSuite {
 
   private val stageSubmittedJsonString =
     """
-      {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
-      "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details",
-      "Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]},"Properties":
-      {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
+      |{
+      |  "Event": "SparkListenerStageSubmitted",
+      |  "Stage Info": {
+      |    "Stage ID": 100,
+      |    "Stage Attempt ID": 0,
+      |    "Stage Name": "greetings",
+      |    "Number of Tasks": 200,
+      |    "RDD Info": [],
+      |    "Details": "details",
+      |    "Accumulables": [
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      }
+      |    ]
+      |  },
+      |  "Properties": {
+      |    "France": "Paris",
+      |    "Germany": "Berlin",
+      |    "Russia": "Moscow",
+      |    "Ukraine": "Kiev"
+      |  }
+      |}
     """
 
   private val stageCompletedJsonString =
     """
-      {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"Stage Name":
-      "greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage
-      Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
-      "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
-      "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details",
-      "Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]}}
+      |{
+      |  "Event": "SparkListenerStageCompleted",
+      |  "Stage Info": {
+      |    "Stage ID": 101,
+      |    "Stage Attempt ID": 0,
+      |    "Stage Name": "greetings",
+      |    "Number of Tasks": 201,
+      |    "RDD Info": [
+      |      {
+      |        "RDD ID": 101,
+      |        "Name": "mayor",
+      |        "Storage Level": {
+      |          "Use Disk": true,
+      |          "Use Memory": true,
+      |          "Use Tachyon": false,
+      |          "Deserialized": true,
+      |          "Replication": 1
+      |        },
+      |        "Number of Partitions": 201,
+      |        "Number of Cached Partitions": 301,
+      |        "Memory Size": 401,
+      |        "Tachyon Size": 0,
+      |        "Disk Size": 501
+      |      }
+      |    ],
+      |    "Details": "details",
+      |    "Accumulables": [
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      }
+      |    ]
+      |  }
+      |}
     """
 
   private val taskStartJsonString =
     """
-      |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
-      |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
-      |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
-      |"Failed":false,"Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
-      |"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      |{"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]}}
+      |{
+      |  "Event": "SparkListenerTaskStart",
+      |  "Stage ID": 111,
+      |  "Stage Attempt ID": 0,
+      |  "Task Info": {
+      |    "Task ID": 222,
+      |    "Index": 333,
+      |    "Attempt": 1,
+      |    "Launch Time": 444,
+      |    "Executor ID": "executor",
+      |    "Host": "your kind sir",
+      |    "Locality": "NODE_LOCAL",
+      |    "Speculative": false,
+      |    "Getting Result Time": 0,
+      |    "Finish Time": 0,
+      |    "Failed": false,
+      |    "Accumulables": [
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      },
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 3,
+      |        "Name": "Accumulable3",
+      |        "Update": "delta3",
+      |        "Value": "val3"
+      |      }
+      |    ]
+      |  }
+      |}
     """.stripMargin
 
   private val taskGettingResultJsonString =
     """
-      |{"Event":"SparkListenerTaskGettingResult","Task Info":
-      |  {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
-      |   "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
-      |   "Finish Time":0,"Failed":false,
-      |   "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
-      |   "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      |   {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
+      |{
+      |  "Event": "SparkListenerTaskGettingResult",
+      |  "Task Info": {
+      |    "Task ID": 1000,
+      |    "Index": 2000,
+      |    "Attempt": 5,
+      |    "Launch Time": 3000,
+      |    "Executor ID": "executor",
+      |    "Host": "your kind sir",
+      |    "Locality": "NODE_LOCAL",
+      |    "Speculative": true,
+      |    "Getting Result Time": 0,
+      |    "Finish Time": 0,
+      |    "Failed": false,
+      |    "Accumulables": [
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      },
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 3,
+      |        "Name": "Accumulable3",
+      |        "Update": "delta3",
+      |        "Value": "val3"
+      |      }
+      |    ]
       |  }
       |}
     """.stripMargin
 
   private val taskEndJsonString =
     """
-      |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
-      |"Task End Reason":{"Reason":"Success"},
-      |"Task Info":{
-      |  "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
-      |  "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
-      |  "Getting Result Time":0,"Finish Time":0,"Failed":false,
-      |  "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
-      |  "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      |  {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
-      |},
-      |"Task Metrics":{
-      |  "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
-      |  "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
-      |  "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
-      |  "Shuffle Read Metrics":{
-      |    "Shuffle Finish Time":900,
-      |    "Remote Blocks Fetched":800,
-      |    "Local Blocks Fetched":700,
-      |    "Fetch Wait Time":900,
-      |    "Remote Bytes Read":1000
+      |{
+      |  "Event": "SparkListenerTaskEnd",
+      |  "Stage ID": 1,
+      |  "Stage Attempt ID": 0,
+      |  "Task Type": "ShuffleMapTask",
+      |  "Task End Reason": {
+      |    "Reason": "Success"
       |  },
-      |  "Shuffle Write Metrics":{
-      |    "Shuffle Bytes Written":1200,
-      |    "Shuffle Write Time":1500
+      |  "Task Info": {
+      |    "Task ID": 123,
+      |    "Index": 234,
+      |    "Attempt": 67,
+      |    "Launch Time": 345,
+      |    "Executor ID": "executor",
+      |    "Host": "your kind sir",
+      |    "Locality": "NODE_LOCAL",
+      |    "Speculative": false,
+      |    "Getting Result Time": 0,
+      |    "Finish Time": 0,
+      |    "Failed": false,
+      |    "Accumulables": [
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      },
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 3,
+      |        "Name": "Accumulable3",
+      |        "Update": "delta3",
+      |        "Value": "val3"
+      |      }
+      |    ]
       |  },
-      |  "Updated Blocks":[
-      |    {"Block ID":"rdd_0_0",
-      |      "Status":{
-      |        "Storage Level":{
-      |          "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
-      |          "Replication":2
-      |        },
-      |        "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+      |  "Task Metrics": {
+      |    "Host Name": "localhost",
+      |    "Executor Deserialize Time": 300,
+      |    "Executor Run Time": 400,
+      |    "Result Size": 500,
+      |    "JVM GC Time": 600,
+      |    "Result Serialization Time": 700,
+      |    "Memory Bytes Spilled": 800,
+      |    "Disk Bytes Spilled": 0,
+      |    "Shuffle Read Metrics": {
+      |      "Shuffle Finish Time": 900,
+      |      "Remote Blocks Fetched": 800,
+      |      "Local Blocks Fetched": 700,
+      |      "Fetch Wait Time": 900,
+      |      "Remote Bytes Read": 1000
+      |    },
+      |    "Shuffle Write Metrics": {
+      |      "Shuffle Bytes Written": 1200,
+      |      "Shuffle Write Time": 1500
+      |    },
+      |    "Updated Blocks": [
+      |      {
+      |        "Block ID": "rdd_0_0",
+      |        "Status": {
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": false,
+      |            "Replication": 2
+      |          },
+      |          "Memory Size": 0,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 0
+      |        }
       |      }
-      |    }
       |    ]
       |  }
       |}
@@ -643,80 +806,187 @@ class JsonProtocolSuite extends FunSuite {
 
   private val taskEndWithHadoopInputJsonString =
     """
-      |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
-      |"Task End Reason":{"Reason":"Success"},
-      |"Task Info":{
-      |  "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
-      |  "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
-      |  "Getting Result Time":0,"Finish Time":0,"Failed":false,
-      |  "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
-      |  "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
-      |  {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
-      |},
-      |"Task Metrics":{
-      |  "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
-      |  "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
-      |  "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
-      |  "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},
-      |  "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100},
-      |  "Updated Blocks":[
-      |    {"Block ID":"rdd_0_0",
-      |      "Status":{
-      |        "Storage Level":{
-      |          "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
-      |          "Replication":2
-      |        },
-      |        "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+      |{
+      |  "Event": "SparkListenerTaskEnd",
+      |  "Stage ID": 1,
+      |  "Stage Attempt ID": 0,
+      |  "Task Type": "ShuffleMapTask",
+      |  "Task End Reason": {
+      |    "Reason": "Success"
+      |  },
+      |  "Task Info": {
+      |    "Task ID": 123,
+      |    "Index": 234,
+      |    "Attempt": 67,
+      |    "Launch Time": 345,
+      |    "Executor ID": "executor",
+      |    "Host": "your kind sir",
+      |    "Locality": "NODE_LOCAL",
+      |    "Speculative": false,
+      |    "Getting Result Time": 0,
+      |    "Finish Time": 0,
+      |    "Failed": false,
+      |    "Accumulables": [
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      },
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 3,
+      |        "Name": "Accumulable3",
+      |        "Update": "delta3",
+      |        "Value": "val3"
       |      }
-      |    }
-      |  ]}
+      |    ]
+      |  },
+      |  "Task Metrics": {
+      |    "Host Name": "localhost",
+      |    "Executor Deserialize Time": 300,
+      |    "Executor Run Time": 400,
+      |    "Result Size": 500,
+      |    "JVM GC Time": 600,
+      |    "Result Serialization Time": 700,
+      |    "Memory Bytes Spilled": 800,
+      |    "Disk Bytes Spilled": 0,
+      |    "Shuffle Write Metrics": {
+      |      "Shuffle Bytes Written": 1200,
+      |      "Shuffle Write Time": 1500
+      |    },
+      |    "Input Metrics": {
+      |      "Data Read Method": "Hadoop",
+      |      "Bytes Read": 2100
+      |    },
+      |    "Updated Blocks": [
+      |      {
+      |        "Block ID": "rdd_0_0",
+      |        "Status": {
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": false,
+      |            "Replication": 2
+      |          },
+      |          "Memory Size": 0,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 0
+      |        }
+      |      }
+      |    ]
+      |  }
       |}
     """
 
   private val jobStartJsonString =
     """
-      {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties":
-      {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
+      |{
+      |  "Event": "SparkListenerJobStart",
+      |  "Job ID": 10,
+      |  "Stage IDs": [
+      |    1,
+      |    2,
+      |    3,
+      |    4
+      |  ],
+      |  "Properties": {
+      |    "France": "Paris",
+      |    "Germany": "Berlin",
+      |    "Russia": "Moscow",
+      |    "Ukraine": "Kiev"
+      |  }
+      |}
     """
 
   private val jobEndJsonString =
     """
-      {"Event":"SparkListenerJobEnd","Job ID":20,"Job Result":{"Result":"JobSucceeded"}}
+      |{
+      |  "Event": "SparkListenerJobEnd",
+      |  "Job ID": 20,
+      |  "Job Result": {
+      |    "Result": "JobSucceeded"
+      |  }
+      |}
     """
 
   private val environmentUpdateJsonString =
     """
-      {"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"GC speed":"9999 objects/s",
-      "Java home":"Land of coffee"},"Spark Properties":{"Job throughput":"80000 jobs/s,
-      regardless of job type"},"System Properties":{"Username":"guest","Password":"guest"},
-      "Classpath Entries":{"Super library":"/tmp/super_library"}}
+      |{
+      |  "Event": "SparkListenerEnvironmentUpdate",
+      |  "JVM Information": {
+      |    "GC speed": "9999 objects/s",
+      |    "Java home": "Land of coffee"
+      |  },
+      |  "Spark Properties": {
+      |    "Job throughput": "80000 jobs/s, regardless of job type"
+      |  },
+      |  "System Properties": {
+      |    "Username": "guest",
+      |    "Password": "guest"
+      |  },
+      |  "Classpath Entries": {
+      |    "Super library": "/tmp/super_library"
+      |  }
+      |}
     """
 
   private val blockManagerAddedJsonString =
     """
-      {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"Stars",
-      "Host":"In your multitude...","Port":300,"Netty Port":400},"Maximum Memory":500}
+      |{
+      |  "Event": "SparkListenerBlockManagerAdded",
+      |  "Block Manager ID": {
+      |    "Executor ID": "Stars",
+      |    "Host": "In your multitude...",
+      |    "Port": 300,
+      |    "Netty Port": 400
+      |  },
+      |  "Maximum Memory": 500
+      |}
     """
 
   private val blockManagerRemovedJsonString =
     """
-      {"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"Scarce",
-      "Host":"to be counted...","Port":100,"Netty Port":200}}
+      |{
+      |  "Event": "SparkListenerBlockManagerRemoved",
+      |  "Block Manager ID": {
+      |    "Executor ID": "Scarce",
+      |    "Host": "to be counted...",
+      |    "Port": 100,
+      |    "Netty Port": 200
+      |  }
+      |}
     """
 
   private val unpersistRDDJsonString =
     """
-      {"Event":"SparkListenerUnpersistRDD","RDD ID":12345}
+      |{
+      |  "Event": "SparkListenerUnpersistRDD",
+      |  "RDD ID": 12345
+      |}
     """
 
   private val applicationStartJsonString =
     """
-      {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","Timestamp":42,
-      "User":"Garfield"}
+      |{
+      |  "Event": "SparkListenerApplicationStart",
+      |  "App Name": "The winner of all",
+      |  "Timestamp": 42,
+      |  "User": "Garfield"
+      |}
     """
 
   private val applicationEndJsonString =
     """
-      {"Event":"SparkListenerApplicationEnd","Timestamp":42}
+      |{
+      |  "Event": "SparkListenerApplicationEnd",
+      |  "Timestamp": 42
+      |}
     """
 }