Skip to content
Snippets Groups Projects
Commit 0475ca8f authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #320 from kayousterhout/erroneous_failed_msg

Remove erroneous FAILED state for killed tasks.

Currently, when tasks are killed, the Executor first sends a
status update for the task with a "KILLED" state, and then
sends a second status update with a "FAILED" state saying that
the task failed due to an exception. The second FAILED state is
misleading/unncessary, and occurs due to a NonLocalReturnControl
Exception that gets thrown due to the way we kill tasks. This
commit eliminates that problem.

I'm not at all sure that this is the best way to fix this problem,
so alternate suggestions welcome. @rxin guessing you're the right
person to look at this.
parents 588a1695 a1b438d9
No related branches found
No related tags found
No related merge requests found
...@@ -141,11 +141,6 @@ private[spark] class Executor( ...@@ -141,11 +141,6 @@ private[spark] class Executor(
val tr = runningTasks.get(taskId) val tr = runningTasks.get(taskId)
if (tr != null) { if (tr != null) {
tr.kill() tr.kill()
// We remove the task also in the finally block in TaskRunner.run.
// The reason we need to remove it here is because killTask might be called before the task
// is even launched, and never reaching that finally block. ConcurrentHashMap's remove is
// idempotent.
runningTasks.remove(taskId)
} }
} }
...@@ -167,6 +162,8 @@ private[spark] class Executor( ...@@ -167,6 +162,8 @@ private[spark] class Executor(
class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable { extends Runnable {
object TaskKilledException extends Exception
@volatile private var killed = false @volatile private var killed = false
@volatile private var task: Task[Any] = _ @volatile private var task: Task[Any] = _
...@@ -200,9 +197,11 @@ private[spark] class Executor( ...@@ -200,9 +197,11 @@ private[spark] class Executor(
// If this task has been killed before we deserialized it, let's quit now. Otherwise, // If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task. // continue executing the task.
if (killed) { if (killed) {
logInfo("Executor killed task " + taskId) // Throw an exception rather than returning, because returning within a try{} block
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
return // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw TaskKilledException
} }
attemptedTask = Some(task) attemptedTask = Some(task)
...@@ -216,9 +215,7 @@ private[spark] class Executor( ...@@ -216,9 +215,7 @@ private[spark] class Executor(
// If the task has been killed, let's fail it. // If the task has been killed, let's fail it.
if (task.killed) { if (task.killed) {
logInfo("Executor killed task " + taskId) throw TaskKilledException
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
return
} }
val resultSer = SparkEnv.get.serializer.newInstance() val resultSer = SparkEnv.get.serializer.newInstance()
...@@ -260,6 +257,11 @@ private[spark] class Executor( ...@@ -260,6 +257,11 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
} }
case TaskKilledException => {
logInfo("Executor killed task " + taskId)
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
}
case t: Throwable => { case t: Throwable => {
val serviceTime = (System.currentTimeMillis() - taskStart).toInt val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val metrics = attemptedTask.flatMap(t => t.metrics) val metrics = attemptedTask.flatMap(t => t.metrics)
......
...@@ -285,7 +285,8 @@ private[spark] class TaskSchedulerImpl( ...@@ -285,7 +285,8 @@ private[spark] class TaskSchedulerImpl(
} }
} }
case None => case None =>
logInfo("Ignoring update from TID " + tid + " because its task set is gone") logInfo("Ignoring update with state %s from TID %s because its task set is gone"
.format(state, tid))
} }
} catch { } catch {
case e: Exception => logError("Exception in statusUpdate", e) case e: Exception => logError("Exception in statusUpdate", e)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment