From 12738c1aec136acd7f2e3e2f8f2b541db0890630 Mon Sep 17 00:00:00 2001
From: Bouke van der Bijl <boukevanderbijl@gmail.com>
Date: Wed, 26 Feb 2014 14:50:37 -0800
Subject: [PATCH] SPARK-1115: Catch depickling errors

This surroungs the complete worker code in a try/except block so we catch any error that arrives. An example would be the depickling failing for some reason

@JoshRosen

Author: Bouke van der Bijl <boukevanderbijl@gmail.com>

Closes #644 from bouk/catch-depickling-errors and squashes the following commits:

f0f67cc [Bouke van der Bijl] Lol indentation
0e4d504 [Bouke van der Bijl] Surround the complete python worker with the try block
---
 python/pyspark/worker.py | 48 ++++++++++++++++++++--------------------
 1 file changed, 24 insertions(+), 24 deletions(-)

diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 1586463520..4c214ef359 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -45,34 +45,34 @@ def report_times(outfile, boot, init, finish):
 
 
 def main(infile, outfile):
-    boot_time = time.time()
-    split_index = read_int(infile)
-    if split_index == -1:  # for unit tests
-        return
+    try:
+        boot_time = time.time()
+        split_index = read_int(infile)
+        if split_index == -1:  # for unit tests
+            return
 
-    # fetch name of workdir
-    spark_files_dir = utf8_deserializer.loads(infile)
-    SparkFiles._root_directory = spark_files_dir
-    SparkFiles._is_running_on_worker = True
+        # fetch name of workdir
+        spark_files_dir = utf8_deserializer.loads(infile)
+        SparkFiles._root_directory = spark_files_dir
+        SparkFiles._is_running_on_worker = True
 
-    # fetch names and values of broadcast variables
-    num_broadcast_variables = read_int(infile)
-    for _ in range(num_broadcast_variables):
-        bid = read_long(infile)
-        value = pickleSer._read_with_length(infile)
-        _broadcastRegistry[bid] = Broadcast(bid, value)
+        # fetch names and values of broadcast variables
+        num_broadcast_variables = read_int(infile)
+        for _ in range(num_broadcast_variables):
+            bid = read_long(infile)
+            value = pickleSer._read_with_length(infile)
+            _broadcastRegistry[bid] = Broadcast(bid, value)
 
-    # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
-    sys.path.append(spark_files_dir) # *.py files that were added will be copied here
-    num_python_includes =  read_int(infile)
-    for _ in range(num_python_includes):
-        filename = utf8_deserializer.loads(infile)
-        sys.path.append(os.path.join(spark_files_dir, filename))
+        # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
+        sys.path.append(spark_files_dir) # *.py files that were added will be copied here
+        num_python_includes =  read_int(infile)
+        for _ in range(num_python_includes):
+            filename = utf8_deserializer.loads(infile)
+            sys.path.append(os.path.join(spark_files_dir, filename))
 
-    command = pickleSer._read_with_length(infile)
-    (func, deserializer, serializer) = command
-    init_time = time.time()
-    try:
+        command = pickleSer._read_with_length(infile)
+        (func, deserializer, serializer) = command
+        init_time = time.time()
         iterator = deserializer.load_stream(infile)
         serializer.dump_stream(func(split_index, iterator), outfile)
     except Exception as e:
-- 
GitLab