diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ccc6c36e9c79a19a5336f77455297e24af9b7722..e30839c49c04f3ac87fb6c432108775e641a96d3 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.util.{Failure, Success} +import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.TaskState.TaskState @@ -64,8 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend( case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => - logError(s"Cannot register with driver: $driverUrl", e) - exitExecutor(1) + exitExecutor(1, s"Cannot register with driver: $driverUrl", e) }(ThreadUtils.sameThread) } @@ -78,16 +78,19 @@ private[spark] class CoarseGrainedExecutorBackend( override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") - executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) + try { + executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) + } catch { + case NonFatal(e) => + exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) + } case RegisterExecutorFailed(message) => - logError("Slave registration failed: " + message) - exitExecutor(1) + exitExecutor(1, "Slave registration failed: " + message) case LaunchTask(data) => if (executor == null) { - logError("Received LaunchTask command but executor was null") - exitExecutor(1) + exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) @@ -97,8 +100,7 @@ private[spark] class CoarseGrainedExecutorBackend( case KillTask(taskId, _, interruptThread) => if (executor == null) { - logError("Received KillTask command but executor was null") - exitExecutor(1) + exitExecutor(1, "Received KillTask command but executor was null") } else { executor.killTask(taskId, interruptThread) } @@ -127,8 +129,7 @@ private[spark] class CoarseGrainedExecutorBackend( if (stopping.get()) { logInfo(s"Driver from $remoteAddress disconnected during shutdown") } else if (driver.exists(_.address == remoteAddress)) { - logError(s"Driver $remoteAddress disassociated! Shutting down.") - exitExecutor(1) + exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.") } else { logWarning(s"An unknown ($remoteAddress) driver disconnected.") } @@ -147,7 +148,14 @@ private[spark] class CoarseGrainedExecutorBackend( * executor exits differently. For e.g. when an executor goes down, * back-end may not want to take the parent process down. */ - protected def exitExecutor(code: Int): Unit = System.exit(code) + protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = { + if (throwable != null) { + logError(reason, throwable) + } else { + logError(reason) + } + System.exit(code) + } } private[spark] object CoarseGrainedExecutorBackend extends Logging {