From afee9024430ef79cc0840a5e5788b60c8c53f9d2 Mon Sep 17 00:00:00 2001
From: Mridul Muralidharan <mridul@gmail.com>
Date: Sun, 28 Apr 2013 22:26:45 +0530
Subject: [PATCH] Attempt to fix streaming test failures after yarn branch
 merge

---
 bagel/src/test/scala/bagel/BagelSuite.scala   |  1 +
 .../test/scala/spark/LocalSparkContext.scala  |  3 +-
 .../src/test/scala/spark/repl/ReplSuite.scala |  1 +
 .../scala/spark/streaming/Checkpoint.scala    | 30 ++++++++++++++-----
 .../streaming/util/MasterFailureTest.scala    |  8 ++++-
 .../streaming/BasicOperationsSuite.scala      |  1 +
 .../spark/streaming/CheckpointSuite.scala     |  4 ++-
 .../scala/spark/streaming/FailureSuite.scala  |  2 ++
 .../spark/streaming/InputStreamsSuite.scala   |  1 +
 .../streaming/WindowOperationsSuite.scala     |  1 +
 10 files changed, 42 insertions(+), 10 deletions(-)

diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 25db395c22..a09c978068 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -23,6 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
     }
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
   test("halting by voting") {
diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala
index ff00dd05dd..76d5258b02 100644
--- a/core/src/test/scala/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/spark/LocalSparkContext.scala
@@ -27,6 +27,7 @@ object LocalSparkContext {
     sc.stop()
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
   /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
@@ -38,4 +39,4 @@ object LocalSparkContext {
     }
   }
 
-}
\ No newline at end of file
+}
diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala
index 43559b96d3..1c64f9b98d 100644
--- a/repl/src/test/scala/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/spark/repl/ReplSuite.scala
@@ -32,6 +32,7 @@ class ReplSuite extends FunSuite {
       interp.sparkContext.stop()
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
     return out.toString
   }
   
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index e303e33e5e..7bd104b8d5 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -38,28 +38,43 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
 private[streaming]
 class CheckpointWriter(checkpointDir: String) extends Logging {
   val file = new Path(checkpointDir, "graph")
+  // The file to which we actually write - and then "move" to file.
+  private val writeFile = new Path(file.getParent, file.getName + ".next")
+  private val bakFile = new Path(file.getParent, file.getName + ".bk")
+
+  @volatile private var stopped = false
+
   val conf = new Configuration()
   var fs = file.getFileSystem(conf)
   val maxAttempts = 3
   val executor = Executors.newFixedThreadPool(1)
 
+  // Removed code which validates whether there is only one CheckpointWriter per path 'file' since 
+  // I did not notice any errors - reintroduce it ?
+
   class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
     def run() {
       var attempts = 0
       val startTime = System.currentTimeMillis()
       while (attempts < maxAttempts) {
+        if (stopped) {
+          logInfo("Already stopped, ignore checkpoint attempt for " + file)
+          return
+        }
         attempts += 1
         try {
           logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
-          if (fs.exists(file)) {
-            val bkFile = new Path(file.getParent, file.getName + ".bk")
-            FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
-            logDebug("Moved existing checkpoint file to " + bkFile)
-          }
-          val fos = fs.create(file)
+          // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
+          val fos = fs.create(writeFile)
           fos.write(bytes)
           fos.close()
-          fos.close()
+          if (fs.exists(file) && fs.rename(file, bakFile)) {
+            logDebug("Moved existing checkpoint file to " + bakFile)
+          }
+          // paranoia
+          fs.delete(file, false)
+          fs.rename(writeFile, file)
+
           val finishTime = System.currentTimeMillis();
           logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
             "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
@@ -84,6 +99,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
   }
 
   def stop() {
+    stopped = true
     executor.shutdown()
   }
 }
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index f673e5be15..e7a3f92bc0 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -74,6 +74,7 @@ object MasterFailureTest extends Logging {
 
     val operation = (st: DStream[String]) => {
       val updateFunc = (values: Seq[Long], state: Option[Long]) => {
+        logInfo("UpdateFunc .. state = " + state.getOrElse(0L) + ", values = " + values)
         Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L))
       }
       st.flatMap(_.split(" "))
@@ -159,6 +160,7 @@ object MasterFailureTest extends Logging {
 
     // Setup the streaming computation with the given operation
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
     var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
     ssc.checkpoint(checkpointDir.toString)
     val inputStream = ssc.textFileStream(testDir.toString)
@@ -205,6 +207,7 @@ object MasterFailureTest extends Logging {
         // (iii) Its not timed out yet
         System.clearProperty("spark.streaming.clock")
         System.clearProperty("spark.driver.port")
+        System.clearProperty("spark.hostPort")
         ssc.start()
         val startTime = System.currentTimeMillis()
         while (!killed && !isLastOutputGenerated && !isTimedOut) {
@@ -357,13 +360,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
         // Write the data to a local file and then move it to the target test directory
         val localFile = new File(localTestDir, (i+1).toString)
         val hadoopFile = new Path(testDir, (i+1).toString)
+        val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString)
         FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
         var tries = 0
 	var done = false
         while (!done && tries < maxTries) {
           tries += 1
           try {
-            fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+            // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+            fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile)
+            fs.rename(tempHadoopFile, hadoopFile)
 	    done = true
 	  } catch {
 	    case ioe: IOException => { 
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index cf2ed8b1d4..e7352deb81 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -15,6 +15,7 @@ class BasicOperationsSuite extends TestSuiteBase {
   after {
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
   test("map") {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index cac86deeaf..607dea77ec 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -31,6 +31,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
 
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
   var ssc: StreamingContext = null
@@ -325,6 +326,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     )
     ssc = new StreamingContext(checkpointDir)
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
     ssc.start()
     val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
     // the first element will be re-processed data of the last batch before restart
@@ -350,4 +352,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
     val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
     outputStream.output
   }
-}
\ No newline at end of file
+}
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index a5fa7ab92d..4529e774e9 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -22,10 +22,12 @@ class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
   val batchDuration = Milliseconds(1000)
 
   before {
+    logInfo("BEFORE ...")
     FileUtils.deleteDirectory(new File(directory))
   }
 
   after {
+    logInfo("AFTER ...")
     FileUtils.deleteDirectory(new File(directory))
   }
 
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 67dca2ac31..0acb6db6f2 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -41,6 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
   after {
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
 
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 1b66f3bda2..80d827706f 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase {
   after {
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
   }
 
   val largerSlideInput = Seq(
-- 
GitLab