Skip to content
Snippets Groups Projects
Commit 7fa6978a authored by Mridul Muralidharan's avatar Mridul Muralidharan
Browse files

Allow CheckpointWriter pending tasks to finish

parent 00c7a376
No related branches found
No related tags found
No related merge requests found
...@@ -42,7 +42,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { ...@@ -42,7 +42,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
private val writeFile = new Path(file.getParent, file.getName + ".next") private val writeFile = new Path(file.getParent, file.getName + ".next")
private val bakFile = new Path(file.getParent, file.getName + ".bk") private val bakFile = new Path(file.getParent, file.getName + ".bk")
@volatile private var stopped = false private var stopped = false
val conf = new Configuration() val conf = new Configuration()
var fs = file.getFileSystem(conf) var fs = file.getFileSystem(conf)
...@@ -57,10 +57,6 @@ class CheckpointWriter(checkpointDir: String) extends Logging { ...@@ -57,10 +57,6 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
var attempts = 0 var attempts = 0
val startTime = System.currentTimeMillis() val startTime = System.currentTimeMillis()
while (attempts < maxAttempts) { while (attempts < maxAttempts) {
if (stopped) {
logInfo("Already stopped, ignore checkpoint attempt for " + file)
return
}
attempts += 1 attempts += 1
try { try {
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
...@@ -99,8 +95,13 @@ class CheckpointWriter(checkpointDir: String) extends Logging { ...@@ -99,8 +95,13 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
} }
def stop() { def stop() {
stopped = true synchronized {
if (stopped) return ;
stopped = true
}
executor.shutdown() executor.shutdown()
val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
logInfo("CheckpointWriter executor terminated ? " + terminated)
} }
} }
......
...@@ -54,8 +54,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { ...@@ -54,8 +54,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
throw new Exception("Batch duration already set as " + batchDuration + throw new Exception("Batch duration already set as " + batchDuration +
". cannot set it again.") ". cannot set it again.")
} }
batchDuration = duration
} }
batchDuration = duration
} }
def remember(duration: Duration) { def remember(duration: Duration) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment