From 4e9f0c2df6508db8fbe00901aa78fb5809e46010 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Mon, 12 Aug 2013 15:08:39 -0700
Subject: [PATCH] Capturing GC detials in TaskMetrics

---
 .../main/scala/spark/executor/Executor.scala  | 22 ++++++++++++-------
 .../scala/spark/executor/TaskMetrics.scala    |  7 +++++-
 .../scheduler/local/LocalScheduler.scala      | 13 ++++++++++-
 .../main/scala/spark/ui/jobs/StagePage.scala  |  5 +++++
 4 files changed, 37 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 8a74a8d853..6bfb132023 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -17,18 +17,17 @@
 
 package spark.executor
 
-import java.io.{File, FileOutputStream}
-import java.net.{URI, URL, URLClassLoader}
+import java.io.{File}
+import java.lang.management.ManagementFactory
+import java.nio.ByteBuffer
 import java.util.concurrent._
 
-import org.apache.hadoop.fs.FileUtil
-
-import scala.collection.mutable.{ArrayBuffer, Map, HashMap}
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
 
-import spark.broadcast._
 import spark.scheduler._
 import spark._
-import java.nio.ByteBuffer
+
 
 /**
  * The Mesos executor for Spark.
@@ -116,6 +115,9 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
       context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
       var attemptedTask: Option[Task[Any]] = None
       var taskStart: Long = 0
+      def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
+      val startGCTime = getTotalGCTime
+
       try {
         SparkEnv.set(env)
         Accumulators.clear()
@@ -132,6 +134,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
           m.hostname = Utils.localHostName
           m.executorDeserializeTime = (taskStart - startTime).toInt
           m.executorRunTime = (taskFinish - taskStart).toInt
+          m.jvmGCTime = getTotalGCTime - startGCTime
         }
         //TODO I'd also like to track the time it takes to serialize the task results, but that is huge headache, b/c
         // we need to serialize the task metrics first.  If TaskMetrics had a custom serialized format, we could
@@ -155,7 +158,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
         case t: Throwable => {
           val serviceTime = (System.currentTimeMillis() - taskStart).toInt
           val metrics = attemptedTask.flatMap(t => t.metrics)
-          metrics.foreach{m => m.executorRunTime = serviceTime}
+          metrics.foreach{m =>
+            m.executorRunTime = serviceTime
+            m.jvmGCTime = getTotalGCTime - startGCTime
+          }
           val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
           context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
 
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index 3151627839..47b8890bee 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -31,13 +31,18 @@ class TaskMetrics extends Serializable {
   /**
    * Time the executor spends actually running the task (including fetching shuffle data)
    */
-  var executorRunTime:Int = _
+  var executorRunTime: Int = _
 
   /**
    * The number of bytes this task transmitted back to the driver as the TaskResult
    */
   var resultSize: Long = _
 
+  /**
+   * Amount of time the JVM spent in garbage collection while executing this task
+   */
+  var jvmGCTime: Long = _
+
   /**
    * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    */
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index f274b1a767..322c32543f 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -18,8 +18,11 @@
 package spark.scheduler.local
 
 import java.io.File
+import java.lang.management.ManagementFactory
 import java.util.concurrent.atomic.AtomicInteger
 import java.nio.ByteBuffer
+
+import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
@@ -31,6 +34,7 @@ import spark.scheduler._
 import spark.scheduler.cluster._
 import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 import akka.actor._
+import management.ManagementFactory
 
 /**
  * A FIFO or Fair TaskScheduler implementation that runs tasks locally in a thread pool. Optionally
@@ -173,6 +177,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
     var attemptedTask: Option[Task[_]] = None   
     val start = System.currentTimeMillis()
     var taskStart: Long = 0
+    def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
+    val startGCTime = getTotalGCTime
+
     try {
       Accumulators.clear()
       Thread.currentThread().setContextClassLoader(classLoader)
@@ -202,6 +209,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
       val serviceTime = System.currentTimeMillis() - taskStart
       logInfo("Finished " + taskId)
       deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
+      deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime
       deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
       val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
       val serializedResult = ser.serialize(taskResult)
@@ -210,7 +218,10 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
       case t: Throwable => {
         val serviceTime = System.currentTimeMillis() - taskStart
         val metrics = attemptedTask.flatMap(t => t.metrics)
-        metrics.foreach{m => m.executorRunTime = serviceTime.toInt}
+        metrics.foreach{ m =>
+          m.executorRunTime = serviceTime.toInt
+          m.jvmGCTime = getTotalGCTime - startGCTime
+        }
         val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
         localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
       }
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 884c065dee..ce01f6c17e 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -85,6 +85,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
         Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
           {if (hasShuffleRead) Seq("Shuffle Read")  else Nil} ++
           {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
+        Seq("GC Time") ++
         Seq("Details")
 
       val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
@@ -145,6 +146,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
       else metrics.map(m => m.executorRunTime).getOrElse(1)
     val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
       else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+    val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
 
     <tr>
       <td>{info.taskId}</td>
@@ -163,6 +165,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
         <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
           Utils.memoryBytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
       }}
+      <td sortable_customkey={gcTime}>
+        {if (gcTime > 0) {parent.formatDuration(gcTime)} else ""}
+      </td>
       <td>{exception.map(e =>
         <span>
           {e.className} ({e.description})<br/>
-- 
GitLab