From fd833e7ab1bf006c5ae1dff4767b02729e1bbfa7 Mon Sep 17 00:00:00 2001
From: Yinan Li <liyinan926@gmail.com>
Date: Fri, 17 Jan 2014 17:27:25 -0800
Subject: [PATCH] Allow files added through SparkContext.addFile() to be
 overwritten

This is useful for the cases when a file needs to be refreshed and downloaded
by the executors periodically.

Signed-off-by: Yinan Li <liyinan926@gmail.com>
---
 .../scala/org/apache/spark/util/Utils.scala   | 49 +++++++++++++------
 docs/configuration.md                         |  8 +++
 2 files changed, 42 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index caa9bf4c92..e1f8e9520c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -268,6 +268,7 @@ private[spark] object Utils extends Logging {
     val tempFile =  File.createTempFile("fetchFileTemp", null, new File(tempDir))
     val targetFile = new File(targetDir, filename)
     val uri = new URI(url)
+    val fileOverwrite = System.getProperty("spark.files.overwrite", "false").toBoolean
     uri.getScheme match {
       case "http" | "https" | "ftp" =>
         logInfo("Fetching " + url + " to " + tempFile)
@@ -275,47 +276,65 @@ private[spark] object Utils extends Logging {
         val out = new FileOutputStream(tempFile)
         Utils.copyStream(in, out, true)
         if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
-          tempFile.delete()
-          throw new SparkException(
-            "File " + targetFile + " exists and does not match contents of" + " " + url)
-        } else {
-          Files.move(tempFile, targetFile)
+          if (fileOverwrite) {
+            targetFile.delete()
+            logInfo(("File %s exists and does not match contents of %s, " +
+              "replacing it with %s").format(targetFile, url, url))
+          } else {
+            tempFile.delete()
+            throw new SparkException(
+              "File " + targetFile + " exists and does not match contents of" + " " + url)
+          }
         }
+        Files.move(tempFile, targetFile)
       case "file" | null =>
         // In the case of a local file, copy the local file to the target directory.
         // Note the difference between uri vs url.
         val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
+        var shouldCopy = true
         if (targetFile.exists) {
-          // If the target file already exists, warn the user if
           if (!Files.equal(sourceFile, targetFile)) {
-            throw new SparkException(
-              "File " + targetFile + " exists and does not match contents of" + " " + url)
+            if (fileOverwrite) {
+              targetFile.delete()
+              logInfo(("File %s exists and does not match contents of %s, " +
+                "replacing it with %s").format(targetFile, url, url))
+            } else {
+              throw new SparkException(
+                "File " + targetFile + " exists and does not match contents of" + " " + url)
+            }
           } else {
             // Do nothing if the file contents are the same, i.e. this file has been copied
             // previously.
             logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
               + targetFile.getAbsolutePath)
+            shouldCopy = false
           }
-        } else {
+        }
+
+        if (shouldCopy) {
           // The file does not exist in the target directory. Copy it there.
           logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
           Files.copy(sourceFile, targetFile)
         }
       case _ =>
         // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
-        val uri = new URI(url)
         val conf = SparkHadoopUtil.get.newConfiguration()
         val fs = FileSystem.get(uri, conf)
         val in = fs.open(new Path(uri))
         val out = new FileOutputStream(tempFile)
         Utils.copyStream(in, out, true)
         if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
-          tempFile.delete()
-          throw new SparkException("File " + targetFile + " exists and does not match contents of" +
-            " " + url)
-        } else {
-          Files.move(tempFile, targetFile)
+          if (fileOverwrite) {
+            targetFile.delete()
+            logInfo(("File %s exists and does not match contents of %s, " +
+              "replacing it with %s").format(targetFile, url, url))
+          } else {
+            tempFile.delete()
+            throw new SparkException(
+              "File " + targetFile + " exists and does not match contents of" + " " + url)
+          }
         }
+        Files.move(tempFile, targetFile)
     }
     // Decompress the file if it's a .tar or .tar.gz
     if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
diff --git a/docs/configuration.md b/docs/configuration.md
index da70cabba2..3b565e4347 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -431,6 +431,7 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+<<<<<<< HEAD
   <td>spark.logConf</td>
   <td>false</td>
   <td>
@@ -459,6 +460,13 @@ Apart from these, the following properties are also available, and may be useful
     the whole cluster by default. <br/>
     <b>Note:</b> this setting needs to be configured in the standalone cluster master, not in individual
     applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
+</td>
+</tr>
+<tr>
+  <td>spark.files.overwrite</td>
+  <td>false</td>
+  <td>
+    Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source.
   </td>
 </tr>
 </table>
-- 
GitLab