From 5fccb567b37a94445512c7ec20b830b5e062089f Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@apache.org> Date: Mon, 30 Jun 2014 15:12:38 -0700 Subject: [PATCH] [SPARK-2318] When exiting on a signal, print the signal name first. Author: Reynold Xin <rxin@apache.org> Closes #1260 from rxin/signalhandler1 and squashes the following commits: 8e73552 [Reynold Xin] Uh add Logging back in ApplicationMaster. 0402ba8 [Reynold Xin] Synchronize SignalLogger.register. dc70705 [Reynold Xin] Added SignalLogger to YARN ApplicationMaster. 79a21b4 [Reynold Xin] Added license header. 0da052c [Reynold Xin] Added the SignalLogger itself. e587d2e [Reynold Xin] [SPARK-2318] When exiting on a signal, print the signal name first. --- .../apache/spark/deploy/master/Master.scala | 5 +- .../apache/spark/deploy/worker/Worker.scala | 5 +- .../CoarseGrainedExecutorBackend.scala | 6 +- .../spark/executor/MesosExecutorBackend.scala | 5 +- .../org/apache/spark/util/SignalLogger.scala | 60 +++++++++++++++++++ .../spark/deploy/yarn/ApplicationMaster.scala | 5 +- .../spark/deploy/yarn/ApplicationMaster.scala | 5 +- 7 files changed, 79 insertions(+), 12 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/SignalLogger.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 33ffcbd216..11545b8203 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -41,7 +41,7 @@ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} private[spark] class Master( host: String, @@ -755,12 +755,13 @@ private[spark] class Master( } } -private[spark] object Master { +private[spark] object Master extends Logging { val systemName = "sparkMaster" private val actorName = "Master" val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r def main(argStrings: Array[String]) { + SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index a0ecaf709f..ce42544305 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} /** * @param masterUrls Each url should look like spark://host:port. @@ -365,8 +365,9 @@ private[spark] class Worker( } } -private[spark] object Worker { +private[spark] object Worker extends Logging { def main(argStrings: Array[String]) { + SignalLogger.register(log) val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b5fd334f40..8d31bd05fd 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, @@ -97,10 +97,12 @@ private[spark] class CoarseGrainedExecutorBackend( } } -private[spark] object CoarseGrainedExecutorBackend { +private[spark] object CoarseGrainedExecutorBackend extends Logging { def run(driverUrl: String, executorId: String, hostname: String, cores: Int, workerUrl: Option[String]) { + SignalLogger.register(log) + SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 74100498bb..2232e6237b 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -25,8 +25,8 @@ import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} import org.apache.spark.{Logging, TaskState} import org.apache.spark.TaskState.TaskState -import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.util.{SignalLogger, Utils} private[spark] class MesosExecutorBackend extends MesosExecutor @@ -93,8 +93,9 @@ private[spark] class MesosExecutorBackend /** * Entry point for Mesos executor. */ -private[spark] object MesosExecutorBackend { +private[spark] object MesosExecutorBackend extends Logging { def main(args: Array[String]) { + SignalLogger.register(log) SparkHadoopUtil.get.runAsSparkUser { () => MesosNativeLibrary.load() // Create a new Executor and start it running diff --git a/core/src/main/scala/org/apache/spark/util/SignalLogger.scala b/core/src/main/scala/org/apache/spark/util/SignalLogger.scala new file mode 100644 index 0000000000..d769b54fa2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/SignalLogger.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.commons.lang.SystemUtils +import org.slf4j.Logger +import sun.misc.{Signal, SignalHandler} + +/** + * Used to log signals received. This can be very useful in debugging crashes or kills. + * + * Inspired by Colin Patrick McCabe's similar class from Hadoop. + */ +private[spark] object SignalLogger { + + private var registered = false + + /** Register a signal handler to log signals on UNIX-like systems. */ + def register(log: Logger): Unit = synchronized { + if (SystemUtils.IS_OS_UNIX) { + require(!registered, "Can't re-install the signal handlers") + registered = true + + val signals = Seq("TERM", "HUP", "INT") + for (signal <- signals) { + try { + new SignalLoggerHandler(signal, log) + } catch { + case e: Exception => log.warn("Failed to register signal handler " + signal, e) + } + } + log.info("Registered signal handlers for [" + signals.mkString(", ") + "]") + } + } +} + +private sealed class SignalLoggerHandler(name: String, log: Logger) extends SignalHandler { + + val prevHandler = Signal.handle(new Signal(name), this) + + override def handle(signal: Signal): Unit = { + log.error("RECEIVED SIGNAL " + signal.getNumber() + ": SIG" + signal.getName()) + prevHandler.handle(signal) + } +} diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1cc9c33cd2..438737f7a6 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.util.Utils +import org.apache.spark.util.{SignalLogger, Utils} /** * An application master that runs the users driver program and allocates executors. @@ -409,7 +409,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } -object ApplicationMaster { +object ApplicationMaster extends Logging { // Number of times to wait for the allocator loop to complete. // Each loop iteration waits for 100ms, so maximum of 3 seconds. // This is to ensure that we have reasonable number of containers before we start @@ -487,6 +487,7 @@ object ApplicationMaster { } def main(argStrings: Array[String]) { + SignalLogger.register(log) val args = new ApplicationMasterArguments(argStrings) SparkHadoopUtil.get.runAsSparkUser { () => new ApplicationMaster(args).run() diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 6244332f23..ee1e9c9c23 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.util.Utils +import org.apache.spark.util.{SignalLogger, Utils} /** @@ -363,7 +363,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } -object ApplicationMaster { +object ApplicationMaster extends Logging { // Number of times to wait for the allocator loop to complete. // Each loop iteration waits for 100ms, so maximum of 3 seconds. // This is to ensure that we have reasonable number of containers before we start @@ -455,6 +455,7 @@ object ApplicationMaster { } def main(argStrings: Array[String]) { + SignalLogger.register(log) val args = new ApplicationMasterArguments(argStrings) SparkHadoopUtil.get.runAsSparkUser { () => new ApplicationMaster(args).run() -- GitLab