From ff562b2396bb59b8c71d800d37b3a36ddac3488f Mon Sep 17 00:00:00 2001
From: Zhen Peng <zhenpeng01@baidu.com>
Date: Fri, 30 May 2014 10:11:02 -0700
Subject: [PATCH] [SPARK-1901] worker should make sure executor has exited
 before updating executor's info

https://issues.apache.org/jira/browse/SPARK-1901

Author: Zhen Peng <zhenpeng01@baidu.com>

Closes #854 from zhpengg/bugfix-worker-kills-executor and squashes the following commits:

21d380b [Zhen Peng] add some error messages
506cea6 [Zhen Peng] add some docs for killProcess()
a0b9860 [Zhen Peng] [SPARK-1901] worker should make sure executor has exited before updating executor's info
---
 .../spark/deploy/worker/ExecutorRunner.scala  | 20 +++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 2051403682..d27e0e1f15 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -61,17 +61,23 @@ private[spark] class ExecutorRunner(
     // Shutdown hook that kills actors on shutdown.
     shutdownHook = new Thread() {
       override def run() {
-        killProcess()
+        killProcess(Some("Worker shutting down"))
       }
     }
     Runtime.getRuntime.addShutdownHook(shutdownHook)
   }
 
-  private def killProcess() {
+  /**
+   * kill executor process, wait for exit and notify worker to update resource status
+   *
+   * @param message the exception message which caused the executor's death 
+   */
+  private def killProcess(message: Option[String]) {
     if (process != null) {
       logInfo("Killing process!")
       process.destroy()
-      process.waitFor()
+      val exitCode = process.waitFor()
+      worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
     }
   }
 
@@ -82,7 +88,6 @@ private[spark] class ExecutorRunner(
       workerThread.interrupt()
       workerThread = null
       state = ExecutorState.KILLED
-      worker ! ExecutorStateChanged(appId, execId, state, None, None)
       Runtime.getRuntime.removeShutdownHook(shutdownHook)
     }
   }
@@ -148,14 +153,13 @@ private[spark] class ExecutorRunner(
     } catch {
       case interrupted: InterruptedException => {
         logInfo("Runner thread for executor " + fullId + " interrupted")
-        killProcess()
+        state = ExecutorState.KILLED
+        killProcess(None)
       }
       case e: Exception => {
         logError("Error running executor", e)
-        killProcess()
         state = ExecutorState.FAILED
-        val message = e.getClass + ": " + e.getMessage
-        worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
+        killProcess(Some(e.toString))
       }
     }
   }
-- 
GitLab