From 0bc0a60d3001dd231e13057a838d4b6550e5a2b9 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Thu, 27 Dec 2012 15:37:33 -0800
Subject: [PATCH] Modifications to make sure LocalScheduler terminate cleanly
 without errors when SparkContext is shutdown, to minimize spurious exception
 during master failure tests.

---
 core/src/main/scala/spark/SparkContext.scala  | 22 ++++++++++---------
 .../scheduler/local/LocalScheduler.scala      |  8 +++++--
 core/src/test/resources/log4j.properties      |  2 +-
 .../scala/spark/ClosureCleanerSuite.scala     |  2 ++
 streaming/src/test/resources/log4j.properties | 13 ++++++-----
 5 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index caa9a1794b..0c8b0078a3 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -488,17 +488,19 @@ class SparkContext(
     if (dagScheduler != null) {
       dagScheduler.stop()
       dagScheduler = null
+      taskScheduler = null
+      // TODO: Cache.stop()?
+      env.stop()
+      // Clean up locally linked files
+      clearFiles()
+      clearJars()
+      SparkEnv.set(null)
+      ShuffleMapTask.clearCache()
+      ResultTask.clearCache()
+      logInfo("Successfully stopped SparkContext")
+    } else {
+      logInfo("SparkContext already stopped")
     }
-    taskScheduler = null
-    // TODO: Cache.stop()?
-    env.stop()
-    // Clean up locally linked files
-    clearFiles()
-    clearJars()
-    SparkEnv.set(null)
-    ShuffleMapTask.clearCache()
-    ResultTask.clearCache()
-    logInfo("Successfully stopped SparkContext")
   }
 
   /**
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index eb20fe41b2..17a0a4b103 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -81,7 +81,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
         val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
           ser.serialize(Accumulators.values))
         logInfo("Finished task " + idInJob)
-        listener.taskEnded(task, Success, resultToReturn, accumUpdates)
+
+        // If the threadpool has not already been shutdown, notify DAGScheduler
+        if (!Thread.currentThread().isInterrupted)
+          listener.taskEnded(task, Success, resultToReturn, accumUpdates)
       } catch {
         case t: Throwable => {
           logError("Exception in task " + idInJob, t)
@@ -91,7 +94,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
               submitTask(task, idInJob)
             } else {
               // TODO: Do something nicer here to return all the way to the user
-              listener.taskEnded(task, new ExceptionFailure(t), null, null)
+              if (!Thread.currentThread().isInterrupted)
+                listener.taskEnded(task, new ExceptionFailure(t), null, null)
             }
           }
         }
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index 4c99e450bc..5ed388e91b 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -1,4 +1,4 @@
-# Set everything to be logged to the console
+# Set everything to be logged to the file spark-tests.log 
 log4j.rootCategory=INFO, file
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.append=false
diff --git a/core/src/test/scala/spark/ClosureCleanerSuite.scala b/core/src/test/scala/spark/ClosureCleanerSuite.scala
index 7c0334d957..dfa2de80e6 100644
--- a/core/src/test/scala/spark/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/spark/ClosureCleanerSuite.scala
@@ -47,6 +47,8 @@ object TestObject {
     val nums = sc.parallelize(Array(1, 2, 3, 4))
     val answer = nums.map(_ + x).reduce(_ + _)
     sc.stop()
+    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    System.clearProperty("spark.master.port")
     return answer
   }
 }
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 02fe16866e..33bafebaab 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,8 +1,11 @@
-# Set everything to be logged to the console
-log4j.rootCategory=WARN, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+# Set everything to be logged to the file streaming-tests.log 
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
 
 # Ignore messages below warning level from Jetty, because it's a bit verbose
 log4j.logger.org.eclipse.jetty=WARN
+
-- 
GitLab