diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index d29a1a9881cd4b23e63be178504abc482a8e2d96..5aa0b030dbdd810526b1533c368ad781159a4035 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -30,18 +30,15 @@ import scala.reflect.ClassTag
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog
 import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
-import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
 
 // SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
 import org.apache.hadoop.mapred.SparkHadoopWriter
-import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
 
 import org.apache.spark._
 import org.apache.spark.Partitioner.defaultPartitioner
@@ -604,8 +601,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     val job = new NewAPIHadoopJob(conf)
     job.setOutputKeyClass(keyClass)
     job.setOutputValueClass(valueClass)
+
     val wrappedConf = new SerializableWritable(job.getConfiguration)
-    NewFileOutputFormat.setOutputPath(job, new Path(path))
+    val outpath = new Path(path)
+    NewFileOutputFormat.setOutputPath(job, outpath)
+    val jobFormat = outputFormatClass.newInstance
+    jobFormat.checkOutputSpecs(job)
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     val jobtrackerID = formatter.format(new Date())
     val stageId = self.id
@@ -633,7 +634,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       committer.commitTask(hadoopContext)
       return 1
     }
-    val jobFormat = outputFormatClass.newInstance
+
     /* apparently we need a TaskAttemptID to construct an OutputCommitter;
      * however we're only going to use this local OutputCommitter for
      * setupJob/commitJob, so we just use a dummy "map" task.
@@ -642,7 +643,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
     val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
     jobCommitter.setupJob(jobTaskContext)
-    val count = self.context.runJob(self, writeShard _).sum
+    self.context.runJob(self, writeShard _)
     jobCommitter.commitJob(jobTaskContext)
   }
 
@@ -696,10 +697,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * MapReduce job.
    */
   def saveAsHadoopDataset(conf: JobConf) {
-    val outputFormatClass = conf.getOutputFormat
+    val outputFormatInstance = conf.getOutputFormat
     val keyClass = conf.getOutputKeyClass
     val valueClass = conf.getOutputValueClass
-    if (outputFormatClass == null) {
+    if (outputFormatInstance == null) {
       throw new SparkException("Output format class not set")
     }
     if (keyClass == null) {
@@ -712,6 +713,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
       valueClass.getSimpleName + ")")
 
+    if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
+      // FileOutputFormat ignores the filesystem parameter
+      val ignoredFs = FileSystem.get(conf)
+      conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
+    }
+
     val writer = new SparkHadoopWriter(conf)
     writer.preSetup()
 
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 8ff02aef67aa008f27e8c33bb21ddd07ceb5704f..76173608e9f704712458470a286a97e6f2103a5a 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -24,9 +24,11 @@ import scala.io.Source
 import com.google.common.io.Files
 import org.apache.hadoop.io._
 import org.apache.hadoop.io.compress.DefaultCodec
+import org.apache.hadoop.mapred.FileAlreadyExistsException
 import org.scalatest.FunSuite
 
 import org.apache.spark.SparkContext._
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
 
 class FileSuite extends FunSuite with LocalSparkContext {
 
@@ -208,4 +210,44 @@ class FileSuite extends FunSuite with LocalSparkContext {
     assert(rdd.count() === 3)
     assert(rdd.count() === 3)
   }
+
+  test ("prevent user from overwriting the empty directory (old Hadoop API)") {
+    sc = new SparkContext("local", "test")
+    val tempdir = Files.createTempDir()
+    val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
+    intercept[FileAlreadyExistsException] {
+      randomRDD.saveAsTextFile(tempdir.getPath)
+    }
+  }
+
+  test ("prevent user from overwriting the non-empty directory (old Hadoop API)") {
+    sc = new SparkContext("local", "test")
+    val tempdir = Files.createTempDir()
+    val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
+    randomRDD.saveAsTextFile(tempdir.getPath + "/output")
+    assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
+    intercept[FileAlreadyExistsException] {
+      randomRDD.saveAsTextFile(tempdir.getPath + "/output")
+    }
+  }
+
+  test ("prevent user from overwriting the empty directory (new Hadoop API)") {
+    sc = new SparkContext("local", "test")
+    val tempdir = Files.createTempDir()
+    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    intercept[FileAlreadyExistsException] {
+      randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
+    }
+  }
+
+  test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
+    sc = new SparkContext("local", "test")
+    val tempdir = Files.createTempDir()
+    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    randomRDD.saveAsTextFile(tempdir.getPath + "/output")
+    assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
+    intercept[FileAlreadyExistsException] {
+      randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
+    }
+  }
 }