From 6e6b5204ea015fc7cc2c3e16e0032be3074413be Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Wed, 24 Apr 2013 18:53:12 -0700
Subject: [PATCH] Create an empty directory when checkpointing a 0-partition
 RDD (fixes a test failure on Hadoop 2.0)

---
 core/src/main/scala/spark/RDDCheckpointData.scala | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala
index d00092e984..57e0405fb4 100644
--- a/core/src/main/scala/spark/RDDCheckpointData.scala
+++ b/core/src/main/scala/spark/RDDCheckpointData.scala
@@ -1,6 +1,7 @@
 package spark
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
 import rdd.{CheckpointRDD, CoalescedRDD}
 import scheduler.{ResultTask, ShuffleMapTask}
 
@@ -62,14 +63,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
       }
     }
 
+    // Create the output path for the checkpoint
+    val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
+    val fs = path.getFileSystem(new Configuration())
+    if (!fs.mkdirs(path)) {
+      throw new SparkException("Failed to create checkpoint path " + path)
+    }
+
     // Save to file, and reload it as an RDD
-    val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id).toString
-    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _)
-    val newRDD = new CheckpointRDD[T](rdd.context, path)
+    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
+    val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
 
     // Change the dependencies and partitions of the RDD
     RDDCheckpointData.synchronized {
-      cpFile = Some(path)
+      cpFile = Some(path.toString)
       cpRDD = Some(newRDD)
       rdd.markCheckpointed(newRDD)   // Update the RDD's dependencies and partitions
       cpState = Checkpointed
-- 
GitLab