Skip to content
Snippets Groups Projects
Commit eca68d44 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Merge pull request #272 from tmyklebu/master

Track and report task result serialisation time.

 - DirectTaskResult now has a ByteBuffer valueBytes instead of a T value.
 - DirectTaskResult now has a member function T value() that deserialises valueBytes.
 - Executor serialises value into a ByteBuffer and passes it to DTR's ctor.
 - Executor tracks the time taken to do so and puts it in a new field in TaskMetrics.
 - StagePage now reports serialisation time from TaskMetrics along with the other things it reported.
parents 7990c563 d3b1af4b
No related branches found
No related tags found
No related merge requests found
...@@ -222,18 +222,22 @@ private[spark] class Executor( ...@@ -222,18 +222,22 @@ private[spark] class Executor(
return return
} }
val resultSer = SparkEnv.get.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) { for (m <- task.metrics) {
m.hostname = Utils.localHostName() m.hostname = Utils.localHostName()
m.executorDeserializeTime = (taskStart - startTime).toInt m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
} }
// TODO I'd also like to track the time it takes to serialize the task results, but that is
// huge headache, b/c we need to serialize the task metrics first. If TaskMetrics had a
// custom serialized format, we could just change the relevants bytes in the byte buffer
val accumUpdates = Accumulators.values val accumUpdates = Accumulators.values
val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null)) val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult) val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
val serializedResult = { val serializedResult = {
......
...@@ -43,6 +43,11 @@ class TaskMetrics extends Serializable { ...@@ -43,6 +43,11 @@ class TaskMetrics extends Serializable {
*/ */
var jvmGCTime: Long = _ var jvmGCTime: Long = _
/**
* Amount of time spent serializing the task result
*/
var resultSerializationTime: Long = _
/** /**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/ */
......
...@@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se ...@@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se
/** A TaskResult that contains the task's return value and accumulator updates. */ /** A TaskResult that contains the task's return value and accumulator updates. */
private[spark] private[spark]
class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable { extends TaskResult[T] with Externalizable {
def this() = this(null.asInstanceOf[T], null, null) def this() = this(null.asInstanceOf[ByteBuffer], null, null)
override def writeExternal(out: ObjectOutput) { override def writeExternal(out: ObjectOutput) {
val objectSer = SparkEnv.get.serializer.newInstance() out.writeInt(valueBytes.remaining);
val bb = objectSer.serialize(value) Utils.writeByteBuffer(valueBytes, out)
out.writeInt(bb.remaining())
Utils.writeByteBuffer(bb, out)
out.writeInt(accumUpdates.size) out.writeInt(accumUpdates.size)
for ((key, value) <- accumUpdates) { for ((key, value) <- accumUpdates) {
...@@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me ...@@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
override def readExternal(in: ObjectInput) { override def readExternal(in: ObjectInput) {
val objectSer = SparkEnv.get.serializer.newInstance()
val blen = in.readInt() val blen = in.readInt()
val byteVal = new Array[Byte](blen) val byteVal = new Array[Byte](blen)
in.readFully(byteVal) in.readFully(byteVal)
value = objectSer.deserialize(ByteBuffer.wrap(byteVal)) valueBytes = ByteBuffer.wrap(byteVal)
val numUpdates = in.readInt val numUpdates = in.readInt
if (numUpdates == 0) { if (numUpdates == 0) {
...@@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me ...@@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
} }
metrics = in.readObject().asInstanceOf[TaskMetrics] metrics = in.readObject().asInstanceOf[TaskMetrics]
} }
def value(): T = {
val resultSer = SparkEnv.get.serializer.newInstance()
return resultSer.deserialize(valueBytes)
}
} }
...@@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskHeaders: Seq[String] = val taskHeaders: Seq[String] =
Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++ Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
Seq("Duration", "GC Time") ++ Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
Seq("Errors") Seq("Errors")
...@@ -101,6 +101,11 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -101,6 +101,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
None None
} }
else { else {
val serializationTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.resultSerializationTime.toDouble}
val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong))
val serviceTimes = validTasks.map{case (info, metrics, exception) => val serviceTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.executorRunTime.toDouble} metrics.get.executorRunTime.toDouble}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
...@@ -149,6 +154,7 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -149,6 +154,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
val listings: Seq[Seq[String]] = Seq( val listings: Seq[Seq[String]] = Seq(
serializationQuantiles,
serviceQuantiles, serviceQuantiles,
gettingResultQuantiles, gettingResultQuantiles,
schedulerDelayQuantiles, schedulerDelayQuantiles,
...@@ -183,6 +189,7 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -183,6 +189,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration) val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L) val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L)
val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead} val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead}
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
...@@ -210,6 +217,9 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -210,6 +217,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
<td sorttable_customkey={gcTime.toString}> <td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) parent.formatDuration(gcTime) else ""} {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
</td> </td>
<td sorttable_customkey={serializationTime.toString}>
{if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
</td>
{if (shuffleRead) { {if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}> <td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable} {shuffleReadReadable}
......
...@@ -313,6 +313,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo ...@@ -313,6 +313,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
} }
def createTaskResult(id: Int): DirectTaskResult[Int] = { def createTaskResult(id: Int): DirectTaskResult[Int] = {
new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics) val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment