Skip to content
Snippets Groups Projects
Commit 4e9f0c2d authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Capturing GC detials in TaskMetrics

parent f0382007
No related branches found
No related tags found
No related merge requests found
......@@ -17,18 +17,17 @@
package spark.executor
import java.io.{File, FileOutputStream}
import java.net.{URI, URL, URLClassLoader}
import java.io.{File}
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent._
import org.apache.hadoop.fs.FileUtil
import scala.collection.mutable.{ArrayBuffer, Map, HashMap}
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import spark.broadcast._
import spark.scheduler._
import spark._
import java.nio.ByteBuffer
/**
* The Mesos executor for Spark.
......@@ -116,6 +115,9 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var attemptedTask: Option[Task[Any]] = None
var taskStart: Long = 0
def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
val startGCTime = getTotalGCTime
try {
SparkEnv.set(env)
Accumulators.clear()
......@@ -132,6 +134,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
m.hostname = Utils.localHostName
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = getTotalGCTime - startGCTime
}
//TODO I'd also like to track the time it takes to serialize the task results, but that is huge headache, b/c
// we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could
......@@ -155,7 +158,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
case t: Throwable => {
val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val metrics = attemptedTask.flatMap(t => t.metrics)
metrics.foreach{m => m.executorRunTime = serviceTime}
metrics.foreach{m =>
m.executorRunTime = serviceTime
m.jvmGCTime = getTotalGCTime - startGCTime
}
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
......
......@@ -31,13 +31,18 @@ class TaskMetrics extends Serializable {
/**
* Time the executor spends actually running the task (including fetching shuffle data)
*/
var executorRunTime:Int = _
var executorRunTime: Int = _
/**
* The number of bytes this task transmitted back to the driver as the TaskResult
*/
var resultSize: Long = _
/**
* Amount of time the JVM spent in garbage collection while executing this task
*/
var jvmGCTime: Long = _
/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
......
......@@ -18,8 +18,11 @@
package spark.scheduler.local
import java.io.File
import java.lang.management.ManagementFactory
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
......@@ -31,6 +34,7 @@ import spark.scheduler._
import spark.scheduler.cluster._
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import akka.actor._
import management.ManagementFactory
/**
* A FIFO or Fair TaskScheduler implementation that runs tasks locally in a thread pool. Optionally
......@@ -173,6 +177,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var attemptedTask: Option[Task[_]] = None
val start = System.currentTimeMillis()
var taskStart: Long = 0
def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
val startGCTime = getTotalGCTime
try {
Accumulators.clear()
Thread.currentThread().setContextClassLoader(classLoader)
......@@ -202,6 +209,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
val serviceTime = System.currentTimeMillis() - taskStart
logInfo("Finished " + taskId)
deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
val serializedResult = ser.serialize(taskResult)
......@@ -210,7 +218,10 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
case t: Throwable => {
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
metrics.foreach{m => m.executorRunTime = serviceTime.toInt}
metrics.foreach{ m =>
m.executorRunTime = serviceTime.toInt
m.jvmGCTime = getTotalGCTime - startGCTime
}
val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
}
......
......@@ -85,6 +85,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
Seq("GC Time") ++
Seq("Details")
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
......@@ -145,6 +146,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
else metrics.map(m => m.executorRunTime).getOrElse(1)
val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
<tr>
<td>{info.taskId}</td>
......@@ -163,6 +165,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
<td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
Utils.memoryBytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
}}
<td sortable_customkey={gcTime}>
{if (gcTime > 0) {parent.formatDuration(gcTime)} else ""}
</td>
<td>{exception.map(e =>
<span>
{e.className} ({e.description})<br/>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment