From 0f24576c08a361f323b7ad9babfd5d8431d57df0 Mon Sep 17 00:00:00 2001
From: Aaron Davidson <aaron@databricks.com>
Date: Mon, 2 Dec 2013 11:42:53 -0800
Subject: [PATCH] Cleanup and documentation of SparkActorSystem

---
 .../apache/spark/util/SparkActorSystem.scala  | 114 +++++-------------
 1 file changed, 29 insertions(+), 85 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala b/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
index 461e7ab08f..d329063e43 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
+++ b/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
@@ -2,111 +2,55 @@
  *  Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
  */
 
+// Must be in akka.actor package as ActorSystemImpl is protected[akka].
 package akka.actor
 
+import scala.util.control.{ControlThrowable, NonFatal}
+
 import com.typesafe.config.Config
-import akka.util._
-import scala.util.control.{NonFatal, ControlThrowable}
 
 /**
- * An actorSystem specific to spark. It has an additional feature of letting spark tolerate
- * fatal exceptions.
+ * An ActorSystem specific to Spark. Based off of [[akka.actor.ActorSystem]].
+ * The only change from the default system is that we do not shut down the ActorSystem
+ * in the event of a fatal exception. This is necessary as Spark is allowed to recover
+ * from fatal exceptions (see [[org.apache.spark.executor.Executor]]).
  */
 object SparkActorSystem {
-
-  def apply(name: String, config: Config): ActorSystem = apply(name, config, findClassLoader())
+  def apply(name: String, config: Config): ActorSystem =
+    apply(name, config, ActorSystem.findClassLoader())
 
   def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
     new SparkActorSystemImpl(name, config, classLoader).start()
-
-  /**
-   * INTERNAL API
-   */
-  private[akka] def findClassLoader(): ClassLoader = {
-    def findCaller(get: Int ⇒ Class[_]): ClassLoader =
-      Iterator.from(2 /*is the magic number, promise*/).map(get) dropWhile {
-        c ⇒
-          c != null &&
-            (c.getName.startsWith("akka.actor.ActorSystem") ||
-              c.getName.startsWith("scala.Option") ||
-              c.getName.startsWith("scala.collection.Iterator") ||
-              c.getName.startsWith("akka.util.Reflect"))
-      } next() match {
-        case null ⇒ getClass.getClassLoader
-        case c ⇒ c.getClassLoader
-      }
-
-    Option(Thread.currentThread.getContextClassLoader) orElse
-      (Reflect.getCallerClass map findCaller) getOrElse
-      getClass.getClassLoader
-  }
 }
 
-private[akka] class SparkActorSystemImpl(override val name: String,
-                                         applicationConfig: Config,
-                                         classLoader: ClassLoader)
+private[akka] class SparkActorSystemImpl(
+    override val name: String,
+    applicationConfig: Config,
+    classLoader: ClassLoader)
   extends ActorSystemImpl(name, applicationConfig, classLoader) {
 
-  protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
+  protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
+    val fallbackHandler = super.uncaughtExceptionHandler
+
     new Thread.UncaughtExceptionHandler() {
       def uncaughtException(thread: Thread, cause: Throwable): Unit = {
-        cause match {
-          case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable
-          ⇒ log.error(cause, "Uncaught error from thread [{}]", thread.getName)
-          case _ ⇒
-            if (settings.JvmExitOnFatalError) {
-              try {
-                log.error(cause, "Uncaught error from thread [{}] shutting down JVM since " +
-                  "'akka.jvm-exit-on-fatal-error' is enabled", thread.getName)
-                import System.err
-                err.print("Uncaught error from thread [")
-                err.print(thread.getName)
-                err.print("] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for " +
-                  "ActorSystem[")
-                err.print(name)
-                err.println("]")
-                cause.printStackTrace(System.err)
-                System.err.flush()
-              } finally {
-                System.exit(-1)
-              }
-            } else {
-              log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
-                "ActorSystem tolerating and continuing.... [{}]", thread.getName, name)
-              //shutdown()                 //TODO make it configurable
-              if (thread.isAlive) log.error("Thread is still alive")
-              else {
-                log.error("Thread is dead")
-              }
-            }
+        if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
+          log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
+            "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
+          //shutdown()                 //TODO make it configurable
+        } else {
+          fallbackHandler.uncaughtException(thread, cause)
         }
       }
     }
-
-  override def stop(actor: ActorRef): Unit = {
-    val path = actor.path
-    val guard = guardian.path
-    val sys = systemGuardian.path
-    path.parent match {
-      case `guard` ⇒ guardian ! StopChild(actor)
-      case `sys` ⇒ systemGuardian ! StopChild(actor)
-      case _ ⇒ actor.asInstanceOf[InternalActorRef].stop()
-    }
   }
 
-
-  override def /(actorName: String): ActorPath = guardian.path / actorName
-
-  override def /(path: Iterable[String]): ActorPath = guardian.path / path
-
-  private lazy val _start: this.type = {
-    // the provider is expected to start default loggers, LocalActorRefProvider does this
-    provider.init(this)
-    this
+  def isFatalError(e: Throwable): Boolean = {
+    e match {
+      case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
+        false
+      case _ =>
+        true
+    }
   }
-
-  override def start(): this.type = _start
-
-  override def toString: String = lookupRoot.path.root.address.toString
-
 }
-- 
GitLab