From 7c5ff733ee1d3729b4b26f7c5542ca00c4d64139 Mon Sep 17 00:00:00 2001
From: Jey Kottalam <jey@cs.berkeley.edu>
Date: Thu, 23 May 2013 11:50:24 -0700
Subject: [PATCH] PySpark daemon: fix deadlock, improve error handling

---
 python/pyspark/daemon.py | 67 ++++++++++++++++++++++++++++++----------
 1 file changed, 50 insertions(+), 17 deletions(-)

diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index ab9c19df57..2b5e9b3581 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -1,6 +1,7 @@
 import os
 import sys
 import multiprocessing
+from ctypes import c_bool
 from errno import EINTR, ECHILD
 from socket import socket, AF_INET, SOCK_STREAM, SOMAXCONN
 from signal import signal, SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
@@ -12,7 +13,12 @@ try:
 except NotImplementedError:
     POOLSIZE = 4
 
-should_exit = multiprocessing.Event()
+exit_flag = multiprocessing.Value(c_bool, False)
+
+
+def should_exit():
+    global exit_flag
+    return exit_flag.value
 
 
 def worker(listen_sock):
@@ -20,14 +26,29 @@ def worker(listen_sock):
     os.dup2(2, 1)
 
     # Manager sends SIGHUP to request termination of workers in the pool
-    def handle_sighup(signum, frame):
-        assert should_exit.is_set()
+    def handle_sighup(*args):
+        assert should_exit()
     signal(SIGHUP, handle_sighup)
 
-    while not should_exit.is_set():
+    # Cleanup zombie children
+    def handle_sigchld(*args):
+        pid = status = None
+        try:
+            while (pid, status) != (0, 0):
+                pid, status = os.waitpid(0, os.WNOHANG)
+        except EnvironmentError as err:
+            if err.errno == EINTR:
+                # retry
+                handle_sigchld()
+            elif err.errno != ECHILD:
+                raise
+    signal(SIGCHLD, handle_sigchld)
+
+    # Handle clients
+    while not should_exit():
         # Wait until a client arrives or we have to exit
         sock = None
-        while not should_exit.is_set() and sock is None:
+        while not should_exit() and sock is None:
             try:
                 sock, addr = listen_sock.accept()
             except EnvironmentError as err:
@@ -35,8 +56,10 @@ def worker(listen_sock):
                     raise
 
         if sock is not None:
-            # Fork to handle the client
-            if os.fork() != 0:
+            # Fork a child to handle the client.
+            # The client is handled in the child so that the manager
+            # never receives SIGCHLD unless a worker crashes.
+            if os.fork() == 0:
                 # Leave the worker pool
                 signal(SIGHUP, SIG_DFL)
                 listen_sock.close()
@@ -49,8 +72,18 @@ def worker(listen_sock):
             else:
                 sock.close()
 
-    assert should_exit.is_set()
-    os._exit(0)
+
+def launch_worker(listen_sock):
+    if os.fork() == 0:
+        try:
+            worker(listen_sock)
+        except Exception as err:
+            import traceback
+            traceback.print_exc()
+            os._exit(1)
+        else:
+            assert should_exit()
+            os._exit(0)
 
 
 def manager():
@@ -66,23 +99,22 @@ def manager():
 
     # Launch initial worker pool
     for idx in range(POOLSIZE):
-        if os.fork() == 0:
-            worker(listen_sock)
-            raise RuntimeError("worker() unexpectedly returned")
+        launch_worker(listen_sock)
     listen_sock.close()
 
     def shutdown():
-        should_exit.set()
+        global exit_flag
+        exit_flag.value = True
 
     # Gracefully exit on SIGTERM, don't die on SIGHUP
     signal(SIGTERM, lambda signum, frame: shutdown())
     signal(SIGHUP, SIG_IGN)
 
     # Cleanup zombie children
-    def handle_sigchld(signum, frame):
+    def handle_sigchld(*args):
         try:
             pid, status = os.waitpid(0, os.WNOHANG)
-            if status != 0 and not should_exit.is_set():
+            if status != 0 and not should_exit():
                 raise RuntimeError("worker crashed: %s, %s" % (pid, status))
         except EnvironmentError as err:
             if err.errno not in (ECHILD, EINTR):
@@ -92,7 +124,7 @@ def manager():
     # Initialization complete
     sys.stdout.close()
     try:
-        while not should_exit.is_set():
+        while not should_exit():
             try:
                 # Spark tells us to exit by closing stdin
                 if os.read(0, 512) == '':
@@ -102,7 +134,8 @@ def manager():
                     shutdown()
                     raise
     finally:
-        should_exit.set()
+        signal(SIGTERM, SIG_DFL)
+        exit_flag.value = True
         # Send SIGHUP to notify workers of shutdown
         os.kill(0, SIGHUP)
 
-- 
GitLab