Skip to content
Snippets Groups Projects
Commit b2f24f94 authored by Tejas Patil's avatar Tejas Patil Committed by Shixiong Zhu
Browse files

[SPARK-16230][CORE] CoarseGrainedExecutorBackend to self kill if there is an...

[SPARK-16230][CORE] CoarseGrainedExecutorBackend to self kill if there is an exception while creating an Executor

## What changes were proposed in this pull request?

With the fix from SPARK-13112, I see that `LaunchTask` is always processed after `RegisteredExecutor` is done and so it gets chance to do all retries to startup an executor. There is still a problem that if `Executor` creation itself fails and there is some exception, it gets unnoticed and the executor is killed when it tries to process the `LaunchTask` as `executor` is null : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L88 So if one looks at the logs, it does not tell that there was problem during `Executor` creation and thats why it was killed.

This PR explicitly catches exception in `Executor` creation, logs a proper message and then exits the JVM. Also, I have changed the `exitExecutor` method to accept `reason` so that backends can use that reason and do stuff like logging to a DB to get an aggregate of such exits at a cluster level

## How was this patch tested?

I am relying on existing tests

Author: Tejas Patil <tejasp@fb.com>

Closes #14202 from tejasapatil/exit_executor_failure.
parent 611a8ca5
No related branches found
No related tags found
No related merge requests found
......@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
......@@ -64,8 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
logError(s"Cannot register with driver: $driverUrl", e)
exitExecutor(1)
exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
}(ThreadUtils.sameThread)
}
......@@ -78,16 +78,19 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
exitExecutor(1)
exitExecutor(1, "Slave registration failed: " + message)
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
exitExecutor(1)
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
......@@ -97,8 +100,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
exitExecutor(1)
exitExecutor(1, "Received KillTask command but executor was null")
} else {
executor.killTask(taskId, interruptThread)
}
......@@ -127,8 +129,7 @@ private[spark] class CoarseGrainedExecutorBackend(
if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
logError(s"Driver $remoteAddress disassociated! Shutting down.")
exitExecutor(1)
exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
......@@ -147,7 +148,14 @@ private[spark] class CoarseGrainedExecutorBackend(
* executor exits differently. For e.g. when an executor goes down,
* back-end may not want to take the parent process down.
*/
protected def exitExecutor(code: Int): Unit = System.exit(code)
protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = {
if (throwable != null) {
logError(reason, throwable)
} else {
logError(reason)
}
System.exit(code)
}
}
private[spark] object CoarseGrainedExecutorBackend extends Logging {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment