From a7d145e98c55fa66a541293930f25d9cdc25f3b4 Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@apache.org>
Date: Sun, 27 Jul 2014 22:54:43 -0700
Subject: [PATCH] [SPARK-1550] [PySpark] Allow SparkContext creation after
 failed attempts

This addresses a PySpark issue where a failed attempt to construct SparkContext would prevent any future SparkContext creation.

Author: Josh Rosen <joshrosen@apache.org>

Closes #1606 from JoshRosen/SPARK-1550 and squashes the following commits:

ec7fadc [Josh Rosen] [SPARK-1550] [PySpark] Allow SparkContext creation after failed attempts
---
 python/pyspark/context.py | 18 ++++++++++++------
 python/pyspark/tests.py   |  6 ++++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index e8ac9895cf..830a6ee03f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -100,7 +100,16 @@ class SparkContext(object):
             tempNamedTuple = namedtuple("Callsite", "function file linenum")
             self._callsite = tempNamedTuple(function=None, file=None, linenum=None)
         SparkContext._ensure_initialized(self, gateway=gateway)
-
+        try:
+            self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
+                          conf)
+        except:
+            # If an error occurs, clean up in order to allow future SparkContext creation:
+            self.stop()
+            raise
+
+    def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
+                 conf):
         self.environment = environment or {}
         self._conf = conf or SparkConf(_jvm=self._jvm)
         self._batchSize = batchSize  # -1 represents an unlimited batch size
@@ -249,17 +258,14 @@ class SparkContext(object):
         """
         return self._jsc.sc().defaultMinPartitions()
 
-    def __del__(self):
-        self.stop()
-
     def stop(self):
         """
         Shut down the SparkContext.
         """
-        if self._jsc:
+        if getattr(self, "_jsc", None):
             self._jsc.stop()
             self._jsc = None
-        if self._accumulatorServer:
+        if getattr(self, "_accumulatorServer", None):
             self._accumulatorServer.shutdown()
             self._accumulatorServer = None
         with SparkContext._lock:
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8ba51461d1..63cc5e9ad9 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -209,6 +209,12 @@ class TestAddFile(PySparkTestCase):
 
 class TestRDDFunctions(PySparkTestCase):
 
+    def test_failed_sparkcontext_creation(self):
+        # Regression test for SPARK-1550
+        self.sc.stop()
+        self.assertRaises(Exception, lambda: SparkContext("an-invalid-master-name"))
+        self.sc = SparkContext("local")
+
     def test_save_as_textfile_with_unicode(self):
         # Regression test for SPARK-970
         x = u"\u00A1Hola, mundo!"
-- 
GitLab