diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 8a74a8d853afd8abec3cc314c8415abdcc30a113..6bfb1320233435cf04712c0f3ba3d22fd6b6eb4b 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -17,18 +17,17 @@ package spark.executor -import java.io.{File, FileOutputStream} -import java.net.{URI, URL, URLClassLoader} +import java.io.{File} +import java.lang.management.ManagementFactory +import java.nio.ByteBuffer import java.util.concurrent._ -import org.apache.hadoop.fs.FileUtil - -import scala.collection.mutable.{ArrayBuffer, Map, HashMap} +import scala.collection.JavaConversions._ +import scala.collection.mutable.HashMap -import spark.broadcast._ import spark.scheduler._ import spark._ -import java.nio.ByteBuffer + /** * The Mesos executor for Spark. @@ -116,6 +115,9 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var attemptedTask: Option[Task[Any]] = None var taskStart: Long = 0 + def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum + val startGCTime = getTotalGCTime + try { SparkEnv.set(env) Accumulators.clear() @@ -132,6 +134,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert m.hostname = Utils.localHostName m.executorDeserializeTime = (taskStart - startTime).toInt m.executorRunTime = (taskFinish - taskStart).toInt + m.jvmGCTime = getTotalGCTime - startGCTime } //TODO I'd also like to track the time it takes to serialize the task results, but that is huge headache, b/c // we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could @@ -155,7 +158,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert case t: Throwable => { val serviceTime = (System.currentTimeMillis() - taskStart).toInt val metrics = attemptedTask.flatMap(t => t.metrics) - metrics.foreach{m => m.executorRunTime = serviceTime} + metrics.foreach{m => + m.executorRunTime = serviceTime + m.jvmGCTime = getTotalGCTime - startGCTime + } val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 315162783981f8e2d18e61f86f90c529c4923492..47b8890bee96dc1ef0a2d1fdb7c6e1418e20960f 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -31,13 +31,18 @@ class TaskMetrics extends Serializable { /** * Time the executor spends actually running the task (including fetching shuffle data) */ - var executorRunTime:Int = _ + var executorRunTime: Int = _ /** * The number of bytes this task transmitted back to the driver as the TaskResult */ var resultSize: Long = _ + /** + * Amount of time the JVM spent in garbage collection while executing this task + */ + var jvmGCTime: Long = _ + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index f274b1a767984524df7b0fba6d92f9045c6ce312..322c32543f72067cb88bce53eac7a291fcc181f4 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -18,8 +18,11 @@ package spark.scheduler.local import java.io.File +import java.lang.management.ManagementFactory import java.util.concurrent.atomic.AtomicInteger import java.nio.ByteBuffer + +import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet @@ -31,6 +34,7 @@ import spark.scheduler._ import spark.scheduler.cluster._ import spark.scheduler.cluster.SchedulingMode.SchedulingMode import akka.actor._ +import management.ManagementFactory /** * A FIFO or Fair TaskScheduler implementation that runs tasks locally in a thread pool. Optionally @@ -173,6 +177,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var attemptedTask: Option[Task[_]] = None val start = System.currentTimeMillis() var taskStart: Long = 0 + def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum + val startGCTime = getTotalGCTime + try { Accumulators.clear() Thread.currentThread().setContextClassLoader(classLoader) @@ -202,6 +209,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: val serviceTime = System.currentTimeMillis() - taskStart logInfo("Finished " + taskId) deserializedTask.metrics.get.executorRunTime = serviceTime.toInt + deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null)) val serializedResult = ser.serialize(taskResult) @@ -210,7 +218,10 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: case t: Throwable => { val serviceTime = System.currentTimeMillis() - taskStart val metrics = attemptedTask.flatMap(t => t.metrics) - metrics.foreach{m => m.executorRunTime = serviceTime.toInt} + metrics.foreach{ m => + m.executorRunTime = serviceTime.toInt + m.jvmGCTime = getTotalGCTime - startGCTime + } val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure)) } diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index 884c065deecac9959dcb88704a866968e099e436..ce01f6c17ee3bc78ccb0ae62352ee82d931d7ac8 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -85,6 +85,7 @@ private[spark] class StagePage(parent: JobProgressUI) { Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++ + Seq("GC Time") ++ Seq("Details") val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks) @@ -145,6 +146,7 @@ private[spark] class StagePage(parent: JobProgressUI) { else metrics.map(m => m.executorRunTime).getOrElse(1) val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration) else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") + val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L) <tr> <td>{info.taskId}</td> @@ -163,6 +165,9 @@ private[spark] class StagePage(parent: JobProgressUI) { <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => Utils.memoryBytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td> }} + <td sortable_customkey={gcTime}> + {if (gcTime > 0) {parent.formatDuration(gcTime)} else ""} + </td> <td>{exception.map(e => <span> {e.className} ({e.description})<br/>