diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0b0a60ee607d12a0a8d45021b6036b1438afb01b..0f19d7a96b0858a21b2aab65480f7616680543e2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -222,18 +222,22 @@ private[spark] class Executor(
           return
         }
 
+        val resultSer = SparkEnv.get.serializer.newInstance()
+        val beforeSerialization = System.currentTimeMillis()
+        val valueBytes = resultSer.serialize(value)
+        val afterSerialization = System.currentTimeMillis()
+
         for (m <- task.metrics) {
           m.hostname = Utils.localHostName()
           m.executorDeserializeTime = (taskStart - startTime).toInt
           m.executorRunTime = (taskFinish - taskStart).toInt
           m.jvmGCTime = gcTime - startGCTime
+          m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
         }
-        // TODO I'd also like to track the time it takes to serialize the task results, but that is
-        // huge headache, b/c we need to serialize the task metrics first.  If TaskMetrics had a
-        // custom serialized format, we could just change the relevants bytes in the byte buffer
+
         val accumUpdates = Accumulators.values
 
-        val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null))
+        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
         val serializedDirectResult = ser.serialize(directResult)
         logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
         val serializedResult = {
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index c0ce46e379344ef24d9ee0cfc538cb4fdd6b9062..bb1471d9ee16addf340d364f03ac0c6068e2eecc 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -43,6 +43,11 @@ class TaskMetrics extends Serializable {
    */
   var jvmGCTime: Long = _
 
+  /**
+   * Amount of time spent serializing the task result
+   */
+  var resultSerializationTime: Long = _
+
   /**
    * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 7e468d0d678081c3c0ab3350c22f51993e14afe1..e80cc6b0f64e81ff3588c92bfea1cfb4acb1624d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se
 
 /** A TaskResult that contains the task's return value and accumulator updates. */
 private[spark]
-class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
   extends TaskResult[T] with Externalizable {
 
-  def this() = this(null.asInstanceOf[T], null, null)
+  def this() = this(null.asInstanceOf[ByteBuffer], null, null)
 
   override def writeExternal(out: ObjectOutput) {
 
-    val objectSer = SparkEnv.get.serializer.newInstance()
-    val bb = objectSer.serialize(value)
-
-    out.writeInt(bb.remaining())
-    Utils.writeByteBuffer(bb, out)
+    out.writeInt(valueBytes.remaining);
+    Utils.writeByteBuffer(valueBytes, out)
 
     out.writeInt(accumUpdates.size)
     for ((key, value) <- accumUpdates) {
@@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
 
   override def readExternal(in: ObjectInput) {
 
-    val objectSer = SparkEnv.get.serializer.newInstance()
-
     val blen = in.readInt()
     val byteVal = new Array[Byte](blen)
     in.readFully(byteVal)
-    value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
+    valueBytes = ByteBuffer.wrap(byteVal)
 
     val numUpdates = in.readInt
     if (numUpdates == 0) {
@@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
     }
     metrics = in.readObject().asInstanceOf[TaskMetrics]
   }
+
+  def value(): T = {
+    val resultSer = SparkEnv.get.serializer.newInstance()
+    return resultSer.deserialize(valueBytes)
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 69f9446babd9511643079355beb51e3a683eabe0..996e1b4d1aa51e805640b939e9834b9587bb9419 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
 
       val taskHeaders: Seq[String] =
         Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
-        Seq("Duration", "GC Time") ++
+        Seq("Duration", "GC Time", "Result Ser Time") ++
         {if (hasShuffleRead) Seq("Shuffle Read")  else Nil} ++
         {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
         Seq("Errors")
@@ -101,6 +101,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
           None
         }
         else {
+          val serializationTimes = validTasks.map{case (info, metrics, exception) =>
+            metrics.get.resultSerializationTime.toDouble}
+          val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
+            ms => parent.formatDuration(ms.toLong))
+
           val serviceTimes = validTasks.map{case (info, metrics, exception) =>
             metrics.get.executorRunTime.toDouble}
           val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
@@ -149,6 +154,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
           val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
 
           val listings: Seq[Seq[String]] = Seq(
+            serializationQuantiles,
             serviceQuantiles,
             gettingResultQuantiles,
             schedulerDelayQuantiles,
@@ -183,6 +189,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
     val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
       else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
     val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
+    val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L)
 
     val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead}
     val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
@@ -210,6 +217,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
       <td sorttable_customkey={gcTime.toString}>
         {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
       </td>
+      <td sorttable_customkey={serializationTime.toString}>
+        {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
+      </td>
       {if (shuffleRead) {
          <td sorttable_customkey={shuffleReadSortable}>
            {shuffleReadReadable}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index 29c4cc5d9caade20fde9776b4c6f9e16b8105c9a..bb28a31a990bc78874892dd543f9557b255bbb1f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -313,6 +313,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
   }
 
   def createTaskResult(id: Int): DirectTaskResult[Int] = {
-    new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
+    val valueSer = SparkEnv.get.serializer.newInstance()
+    new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
   }
 }