Skip to content
Snippets Groups Projects
Commit 6e6b5204 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Create an empty directory when checkpointing a 0-partition RDD (fixes a

test failure on Hadoop 2.0)
parent eef9ea19
No related branches found
No related tags found
No related merge requests found
package spark
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import rdd.{CheckpointRDD, CoalescedRDD}
import scheduler.{ResultTask, ShuffleMapTask}
......@@ -62,14 +63,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
}
}
// Create the output path for the checkpoint
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
val fs = path.getFileSystem(new Configuration())
if (!fs.mkdirs(path)) {
throw new SparkException("Failed to create checkpoint path " + path)
}
// Save to file, and reload it as an RDD
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id).toString
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _)
val newRDD = new CheckpointRDD[T](rdd.context, path)
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
// Change the dependencies and partitions of the RDD
RDDCheckpointData.synchronized {
cpFile = Some(path)
cpFile = Some(path.toString)
cpRDD = Some(newRDD)
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment