diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala index 8140cba0841e519b6a0269638cb2207e8adf6b15..bb75ec208cbc95ef479642daad84e957a602ae30 100644 --- a/core/src/main/scala/spark/TaskEndReason.scala +++ b/core/src/main/scala/spark/TaskEndReason.scala @@ -1,5 +1,6 @@ package spark +import spark.executor.TaskMetrics import spark.storage.BlockManagerId /** @@ -24,7 +25,8 @@ private[spark] case class FetchFailed( private[spark] case class ExceptionFailure( className: String, description: String, - stackTrace: Array[StackTraceElement]) + stackTrace: Array[StackTraceElement], + metrics: Option[TaskMetrics]) extends TaskEndReason private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 8bebfafce4421e559d0bcdd0a7e77863bf702b58..f01be68d1413438b4cb539f6b4f7e8f270bacee3 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -92,15 +92,18 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert val ser = SparkEnv.get.closureSerializer.newInstance() logInfo("Running task ID " + taskId) context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) + var attemptedTask: Option[Task[Any]] = None + var taskStart: Long = 0 try { SparkEnv.set(env) Accumulators.clear() val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) + attemptedTask = Some(task) logInfo("Its generation is " + task.generation) env.mapOutputTracker.updateGeneration(task.generation) - val taskStart = System.currentTimeMillis() + taskStart = System.currentTimeMillis() val value = task.run(taskId.toInt) val taskFinish = System.currentTimeMillis() task.metrics.foreach{ m => @@ -128,7 +131,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert } case t: Throwable => { - val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace) + val serviceTime = (System.currentTimeMillis() - taskStart).toInt + val metrics = attemptedTask.flatMap(t => t.metrics) + metrics.foreach{m => m.executorRunTime = serviceTime} + val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // TODO: Should we exit the whole executor here? On the one hand, the failed task may diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index d9ddc41aa2f3b173d51e75d3b59afe8884cd98a6..1945a4a5147cce47b065b67c79a83d5c16ad4643 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -618,7 +618,7 @@ class DAGScheduler( handleExecutorLost(bmAddress.executorId, Some(task.generation)) } - case ExceptionFailure(className, description, stackTrace) => + case ExceptionFailure(className, description, stackTrace, metrics) => // Do nothing here, left up to the TaskScheduler to decide how to handle user failures case other => diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index 6965cde5da3261e3eb10036237625c76860ab468..fe6420a522cecd89083f8c0eed57831c217964bc 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -571,7 +571,7 @@ private[spark] class ClusterTaskSetManager( return case ef: ExceptionFailure => - sched.listener.taskEnded(tasks(index), ef, null, null, info, null) + sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) val key = ef.description val now = System.currentTimeMillis val (printFull, dupCount) = { diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 9d375e1db8407d969b2ed76b09c4c37c7950dc11..b000e328e678472dc5b05497c651b1c897b52260 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -145,6 +145,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: // Set the Spark execution environment for the worker thread SparkEnv.set(env) val ser = SparkEnv.get.closureSerializer.newInstance() + var attemptedTask: Option[Task[_]] = None + val start = System.currentTimeMillis() + var taskStart: Long = 0 try { Accumulators.clear() Thread.currentThread().setContextClassLoader(classLoader) @@ -153,10 +156,11 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: // this adds a bit of unnecessary overhead but matches how the Mesos Executor works. val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes) updateDependencies(taskFiles, taskJars) // Download any files added with addFile - val taskStart = System.currentTimeMillis() val deserializedTask = ser.deserialize[Task[_]]( taskBytes, Thread.currentThread.getContextClassLoader) - val deserTime = System.currentTimeMillis() - taskStart + attemptedTask = Some(deserializedTask) + val deserTime = System.currentTimeMillis() - start + taskStart = System.currentTimeMillis() // Run it val result: Any = deserializedTask.run(taskId) @@ -174,13 +178,15 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: logInfo("Finished " + taskId) deserializedTask.metrics.get.executorRunTime = serviceTime.toInt deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt - val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null)) val serializedResult = ser.serialize(taskResult) localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { case t: Throwable => { - val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace) + val serviceTime = System.currentTimeMillis() - taskStart + val metrics = attemptedTask.flatMap(t => t.metrics) + metrics.foreach{m => m.executorRunTime = serviceTime.toInt} + val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure)) } } diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala index 499116f6534bc6f3aa96970186433d7740145cd5..f12fec41d506bf478cc19552d4232717a96afaa6 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala @@ -152,7 +152,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas info.markFailed() decreaseRunningTasks(1) val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader) - sched.listener.taskEnded(task, reason, null, null, info, null) + sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null)) if (!finished(index)) { copiesRunning(index) -= 1 numFailures(index) += 1 diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala index 24cfe36aaacd038e7430e97030690246639fce86..8bbc6ce88ea8c29ab9c3ea9794772230e3e2123d 100644 --- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala @@ -41,7 +41,17 @@ private[spark] object UIWorkloadGenerator { 1 }.count }), - ("Job with delays", baseData.map(x => Thread.sleep(1000)).count) + ("Partially failed phase (longer tasks)", { + baseData.map{x => + val probFailure = (4.0 / NUM_PARTITIONS) + if (nextFloat() < probFailure) { + Thread.sleep(100) + throw new Exception("This is a task failure") + } + 1 + }.count + }), + ("Job with delays", baseData.map(x => Thread.sleep(100)).count) ) while (true) { diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index aafa4140559cb5da251f3ff3d49538f35440ff97..36b1cd00edd234de55f099f64ce6967716be9d9d 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -78,18 +78,18 @@ private[spark] class JobProgressListener extends SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val sid = taskEnd.task.stageId - val failureInfo: Option[ExceptionFailure] = + val (failureInfo, metrics): (Option[ExceptionFailure], TaskMetrics) = taskEnd.reason match { case e: ExceptionFailure => stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 - Some(e) + (Some(e), e.metrics.get) case _ => stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 - None + (None, taskEnd.taskMetrics) } val taskList = stageToTaskInfos.getOrElse( sid, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]()) - taskList += ((taskEnd.taskInfo, taskEnd.taskMetrics, failureInfo)) + taskList += ((taskEnd.taskInfo, metrics, failureInfo)) stageToTaskInfos(sid) = taskList }