Skip to content
Snippets Groups Projects
Commit 37abe842 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Tracking some task metrics even during failures.

parent 84b7fc54
No related branches found
No related tags found
No related merge requests found
package spark
import spark.executor.TaskMetrics
import spark.storage.BlockManagerId
/**
......@@ -24,7 +25,8 @@ private[spark] case class FetchFailed(
private[spark] case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement])
stackTrace: Array[StackTraceElement],
metrics: Option[TaskMetrics])
extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason
......
......@@ -92,15 +92,18 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo("Running task ID " + taskId)
context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var attemptedTask: Option[Task[Any]] = None
var taskStart: Long = 0
try {
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
attemptedTask = Some(task)
logInfo("Its generation is " + task.generation)
env.mapOutputTracker.updateGeneration(task.generation)
val taskStart = System.currentTimeMillis()
taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
val taskFinish = System.currentTimeMillis()
task.metrics.foreach{ m =>
......@@ -128,7 +131,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
case t: Throwable => {
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val metrics = attemptedTask.flatMap(t => t.metrics)
metrics.foreach{m => m.executorRunTime = serviceTime}
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
......
......@@ -618,7 +618,7 @@ class DAGScheduler(
handleExecutorLost(bmAddress.executorId, Some(task.generation))
}
case ExceptionFailure(className, description, stackTrace) =>
case ExceptionFailure(className, description, stackTrace, metrics) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures
case other =>
......
......@@ -571,7 +571,7 @@ private[spark] class ClusterTaskSetManager(
return
case ef: ExceptionFailure =>
sched.listener.taskEnded(tasks(index), ef, null, null, info, null)
sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
val key = ef.description
val now = System.currentTimeMillis
val (printFull, dupCount) = {
......
......@@ -145,6 +145,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
// Set the Spark execution environment for the worker thread
SparkEnv.set(env)
val ser = SparkEnv.get.closureSerializer.newInstance()
var attemptedTask: Option[Task[_]] = None
val start = System.currentTimeMillis()
var taskStart: Long = 0
try {
Accumulators.clear()
Thread.currentThread().setContextClassLoader(classLoader)
......@@ -153,10 +156,11 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
// this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
updateDependencies(taskFiles, taskJars) // Download any files added with addFile
val taskStart = System.currentTimeMillis()
val deserializedTask = ser.deserialize[Task[_]](
taskBytes, Thread.currentThread.getContextClassLoader)
val deserTime = System.currentTimeMillis() - taskStart
attemptedTask = Some(deserializedTask)
val deserTime = System.currentTimeMillis() - start
taskStart = System.currentTimeMillis()
// Run it
val result: Any = deserializedTask.run(taskId)
......@@ -174,13 +178,15 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
logInfo("Finished " + taskId)
deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
val serializedResult = ser.serialize(taskResult)
localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
case t: Throwable => {
val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
metrics.foreach{m => m.executorRunTime = serviceTime.toInt}
val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
}
}
......
......@@ -152,7 +152,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
info.markFailed()
decreaseRunningTasks(1)
val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader)
sched.listener.taskEnded(task, reason, null, null, info, null)
sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
if (!finished(index)) {
copiesRunning(index) -= 1
numFailures(index) += 1
......
......@@ -41,7 +41,17 @@ private[spark] object UIWorkloadGenerator {
1
}.count
}),
("Job with delays", baseData.map(x => Thread.sleep(1000)).count)
("Partially failed phase (longer tasks)", {
baseData.map{x =>
val probFailure = (4.0 / NUM_PARTITIONS)
if (nextFloat() < probFailure) {
Thread.sleep(100)
throw new Exception("This is a task failure")
}
1
}.count
}),
("Job with delays", baseData.map(x => Thread.sleep(100)).count)
)
while (true) {
......
......@@ -78,18 +78,18 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
val failureInfo: Option[ExceptionFailure] =
val (failureInfo, metrics): (Option[ExceptionFailure], TaskMetrics) =
taskEnd.reason match {
case e: ExceptionFailure =>
stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
Some(e)
(Some(e), e.metrics.get)
case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
None
(None, taskEnd.taskMetrics)
}
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]())
taskList += ((taskEnd.taskInfo, taskEnd.taskMetrics, failureInfo))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment