diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index f2ce3cbd47f93c333061eafef2f674bdc689f6ad..8909980957058ef71baaf341d24a8044323b3e66 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -737,7 +737,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val outfmt = job.getOutputFormatClass
     val jobFormat = outfmt.newInstance
 
-    if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
+    if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
+      jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
       // FileOutputFormat ignores the filesystem parameter
       jobFormat.checkOutputSpecs(job)
     }
@@ -803,7 +804,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
       valueClass.getSimpleName + ")")
 
-    if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
+    if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
+      outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
       // FileOutputFormat ignores the filesystem parameter
       val ignoredFs = FileSystem.get(conf)
       conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 1f2206b1f03796681118579418eafebdf0f75132..070e974657860042695277f6c47260e1e0eefa27 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -230,6 +230,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test ("allow user to disable the output directory existence checking (old Hadoop API") {
+    val sf = new SparkConf()
+    sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
+    sc = new SparkContext(sf)
+    val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
+    randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
+    randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
+  }
+
   test ("prevent user from overwriting the empty directory (new Hadoop API)") {
     sc = new SparkContext("local", "test")
     val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
@@ -248,6 +259,17 @@ class FileSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test ("allow user to disable the output directory existence checking (new Hadoop API") {
+    val sf = new SparkConf()
+    sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
+    sc = new SparkContext(sf)
+    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
+    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+    assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
+  }
+
   test ("save Hadoop Dataset through old Hadoop API") {
     sc = new SparkContext("local", "test")
     val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
diff --git a/docs/configuration.md b/docs/configuration.md
index 0697f7fc2fd91601b8d652f4b3d94ba5fa7d6c08..71fafa573467f035cde8b82ae91237a41e2a7716 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -487,6 +487,14 @@ Apart from these, the following properties are also available, and may be useful
     this duration will be cleared as well.
   </td>
 </tr>
+<tr>
+    <td>spark.hadoop.validateOutputSpecs</td>
+    <td>true</td>
+    <td>If set to true, validates the output specification (e.g. checking if the output directory already exists) 
+    used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing 
+    output directories. We recommend that users do not disable this except if trying to achieve compatibility with 
+    previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
+</tr>
 </table>
 
 #### Networking