diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 7bd104b8d5b9447ed2fe50503d2b1760d3ddd7c3..4bbad908d007a3a36dbc9d8f61734676235cf0d3 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -42,7 +42,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { private val writeFile = new Path(file.getParent, file.getName + ".next") private val bakFile = new Path(file.getParent, file.getName + ".bk") - @volatile private var stopped = false + private var stopped = false val conf = new Configuration() var fs = file.getFileSystem(conf) @@ -57,10 +57,6 @@ class CheckpointWriter(checkpointDir: String) extends Logging { var attempts = 0 val startTime = System.currentTimeMillis() while (attempts < maxAttempts) { - if (stopped) { - logInfo("Already stopped, ignore checkpoint attempt for " + file) - return - } attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") @@ -99,8 +95,13 @@ class CheckpointWriter(checkpointDir: String) extends Logging { } def stop() { - stopped = true + synchronized { + if (stopped) return ; + stopped = true + } executor.shutdown() + val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS) + logInfo("CheckpointWriter executor terminated ? " + terminated) } } diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index adb7f3a24d25f6fcbd1453c2e75b56b9a22d10b4..3b331956f5973c07af145939d556baa7875a72a0 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -54,8 +54,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { throw new Exception("Batch duration already set as " + batchDuration + ". cannot set it again.") } + batchDuration = duration } - batchDuration = duration } def remember(duration: Duration) {