diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index cb29a6b8b46e2d78462dc170ee80342d1ace82d3..2552958d27e37a0aee8afddba93e40c2fb83f4bd 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -50,9 +50,14 @@ private[spark] class Executor extends Logging { override def uncaughtException(thread: Thread, exception: Throwable) { try { logError("Uncaught exception in thread " + thread, exception) - System.exit(1) + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } } catch { - case t: Throwable => System.exit(2) + case oom: OutOfMemoryError => System.exit(ExecutorExitCode.OOM) + case t: Throwable => System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } } } diff --git a/core/src/main/scala/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/spark/executor/ExecutorExitCode.scala new file mode 100644 index 0000000000000000000000000000000000000000..fd76029cb34446e6c33f1c53cf593db1089d12b1 --- /dev/null +++ b/core/src/main/scala/spark/executor/ExecutorExitCode.scala @@ -0,0 +1,43 @@ +package spark.executor + +/** + * These are exit codes that executors should use to provide the master with information about + * executor failures assuming that cluster management framework can capture the exit codes (but + * perhaps not log files). The exit code constants here are chosen to be unlikely to conflict + * with "natural" exit statuses that may be caused by the JVM or user code. In particular, + * exit codes 128+ arise on some Unix-likes as a result of signals, and it appears that the + * OpenJDK JVM may use exit code 1 in some of its own "last chance" code. + */ +private[spark] +object ExecutorExitCode { + /** The default uncaught exception handler was reached. */ + val UNCAUGHT_EXCEPTION = 50 + + /** The default uncaught exception handler was called and an exception was encountered while + logging the exception. */ + val UNCAUGHT_EXCEPTION_TWICE = 51 + + /** The default uncaught exception handler was reached, and the uncaught exception was an + OutOfMemoryError. */ + val OOM = 52 + + /** DiskStore failed to create a local temporary directory after many attempts. */ + val DISK_STORE_FAILED_TO_CREATE_DIR = 53 + + def explainExitCode(exitCode: Int): String = { + exitCode match { + case UNCAUGHT_EXCEPTION => "Uncaught exception" + case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed" + case OOM => "OutOfMemoryError" + case DISK_STORE_FAILED_TO_CREATE_DIR => + "Failed to create local directory (bad spark.local.dir?)" + case _ => + "Unknown executor exit code (" + exitCode + ")" + ( + if (exitCode > 128) + " (died from signal " + (exitCode - 128) + "?)" + else + "" + ) + } + } +} diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index f5e852d203b496ff988678a7ee3316596f6d7dd2..20f6e65020f54d87418ffb8a1441a886f759c754 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -249,15 +249,22 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - def slaveLost(slaveId: String) { + def slaveLost(slaveId: String, reason: ExecutorLossReason) { var failedHost: Option[String] = None synchronized { val host = slaveIdToHost(slaveId) if (hostsAlive.contains(host)) { + logError("Lost an executor on " + host + ": " + reason) slaveIdsWithExecutors -= slaveId hostsAlive -= host activeTaskSetsQueue.foreach(_.hostLost(host)) failedHost = Some(host) + } else { + // We may get multiple slaveLost() calls with different loss reasons. For example, one + // may be triggered by a dropped connection from the slave while another may be a report + // of executor termination from Mesos. We produce log messages for both so we eventually + // report the termination reason. + logError("Lost an executor on " + host + " (already removed): " + reason) } } if (failedHost != None) { diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala new file mode 100644 index 0000000000000000000000000000000000000000..bba7de6a65c3d17aab47bdfa07c464ee7e801604 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala @@ -0,0 +1,21 @@ +package spark.scheduler.cluster + +import spark.executor.ExecutorExitCode + +/** + * Represents an explanation for a executor or whole slave failing or exiting. + */ +private[spark] +class ExecutorLossReason(val message: String) { + override def toString: String = message +} + +private[spark] +case class ExecutorExited(val exitCode: Int) + extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) { +} + +private[spark] +case class SlaveLost(_message: String = "Slave lost") + extends ExecutorLossReason(_message) { +} diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 7aba7324ab00992702a283caf164f0f631d06299..efaf2d330c4fa25f846df304653e6ed7b2e38932 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -19,6 +19,7 @@ private[spark] class SparkDeploySchedulerBackend( var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt + val executorIdToSlaveId = new HashMap[String, String] // Memory used by each executor (in megabytes) val executorMemory = { @@ -65,9 +66,27 @@ private[spark] class SparkDeploySchedulerBackend( } def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) { + executorIdToSlaveId += id -> workerId logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format( id, host, cores, Utils.memoryMegabytesToString(memory))) } - def executorRemoved(id: String, message: String) {} + def executorRemoved(id: String, message: String) { + var reason: ExecutorLossReason = SlaveLost(message) + if (message.startsWith("Command exited with code ")) { + try { + reason = ExecutorExited(message.substring("Command exited with code ".length).toInt) + } catch { + case nfe: NumberFormatException => {} + } + } + logInfo("Executor %s removed: %s".format(id, message)) + executorIdToSlaveId.get(id) match { + case Some(slaveId) => + executorIdToSlaveId.remove(id) + scheduler.slaveLost(slaveId, reason) + case None => + logInfo("No slave ID known for executor %s".format(id)) + } + } } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d2cce0dc05511f8dbfc3c18b0b6618fae647baa7..eeaae23dc86978abb285ecb776aafa0e049baf8e 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -69,13 +69,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor context.stop(self) case Terminated(actor) => - actorToSlaveId.get(actor).foreach(removeSlave) + actorToSlaveId.get(actor).foreach(removeSlave(_, "Akka actor terminated")) case RemoteClientDisconnected(transport, address) => - addressToSlaveId.get(address).foreach(removeSlave) + addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client disconnected")) case RemoteClientShutdown(transport, address) => - addressToSlaveId.get(address).foreach(removeSlave) + addressToSlaveId.get(address).foreach(removeSlave(_, "remote Akka client shutdown")) } // Make fake resource offers on all slaves @@ -99,7 +99,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } // Remove a disconnected slave from the cluster - def removeSlave(slaveId: String) { + def removeSlave(slaveId: String, reason: String) { logInfo("Slave " + slaveId + " disconnected, so removing it") val numCores = freeCores(slaveId) actorToSlaveId -= slaveActor(slaveId) @@ -109,7 +109,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor freeCores -= slaveId slaveHost -= slaveId totalCoreCount.addAndGet(-numCores) - scheduler.slaveLost(slaveId) + scheduler.slaveLost(slaveId, SlaveLost(reason)) } } diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 814443fa52745a11fab771cf8de6a28bb90e792e..8c7a1dfbc0570b80af2517a7a572eef106c9badd 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -267,17 +267,23 @@ private[spark] class MesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { logInfo("Mesos slave lost: " + slaveId.getValue) synchronized { slaveIdsWithExecutors -= slaveId.getValue } - scheduler.slaveLost(slaveId.getValue) + scheduler.slaveLost(slaveId.getValue, reason) + } + + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + recordSlaveLost(d, slaveId, SlaveLost()) } - override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { - logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) - slaveLost(d, s) + override def executorLost(d: SchedulerDriver, executorId: ExecutorID, + slaveId: SlaveID, status: Int) { + logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, + slaveId.getValue)) + recordSlaveLost(d, slaveId, ExecutorExited(status)) } // TODO: query Mesos for number of cores diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 8ba64e4b76a0e142307932da74b0f058833d0ac1..b5561479dbf50ff225511d31df387fe48bc9a296 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -10,6 +10,8 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import scala.collection.mutable.ArrayBuffer +import spark.executor.ExecutorExitCode + import spark.Utils /** @@ -162,7 +164,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) if (!foundLocalDir) { logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create local dir in " + rootDir) - System.exit(1) + System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } logInfo("Created local directory at " + localDir) localDir