diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a6853fe3989a8bcd45bfac894d73a4a0bb2caa81..60f042f1e07c582e49d82404caef0200dc72a3c7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1410,9 +1410,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     val scheme = new URI(schemeCorrectedPath).getScheme
     if (!Array("http", "https", "ftp").contains(scheme)) {
       val fs = hadoopPath.getFileSystem(hadoopConfiguration)
-      if (!fs.exists(hadoopPath)) {
-        throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
-      }
       val isDir = fs.getFileStatus(hadoopPath).isDirectory
       if (!isLocal && scheme == "file" && isDir) {
         throw new SparkException(s"addFile does not support local directories when not running " +
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index bc09935f93f804a9df556834fe237eb7e9821fbd..6874aa5f938ac471aca2b70ebf56f786c89a8478 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -193,16 +193,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   private def startPolling(): Unit = {
     // Validate the log directory.
     val path = new Path(logDir)
-    if (!fs.exists(path)) {
-      var msg = s"Log directory specified does not exist: $logDir"
-      if (logDir == DEFAULT_LOG_DIR) {
-        msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
+    try {
+      if (!fs.getFileStatus(path).isDirectory) {
+        throw new IllegalArgumentException(
+          "Logging directory specified is not a directory: %s".format(logDir))
       }
-      throw new IllegalArgumentException(msg)
-    }
-    if (!fs.getFileStatus(path).isDirectory) {
-      throw new IllegalArgumentException(
-        "Logging directory specified is not a directory: %s".format(logDir))
+    } catch {
+      case f: FileNotFoundException =>
+        var msg = s"Log directory specified does not exist: $logDir"
+        if (logDir == DEFAULT_LOG_DIR) {
+          msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
+        }
+        throw new FileNotFoundException(msg).initCause(f)
     }
 
     // Disable the background thread during tests.
@@ -495,12 +497,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
       attemptsToClean.foreach { attempt =>
         try {
-          val path = new Path(logDir, attempt.logPath)
-          if (fs.exists(path)) {
-            if (!fs.delete(path, true)) {
-              logWarning(s"Error deleting ${path}")
-            }
-          }
+          fs.delete(new Path(logDir, attempt.logPath), true)
         } catch {
           case e: AccessControlException =>
             logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index fddb9353018a8afe670f0311bff0eeba74c3c8f4..ab6554fd8a7e7ee0acc52715335d965b5a7e497c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
 
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
@@ -166,9 +166,6 @@ private[spark] object ReliableCheckpointRDD extends Logging {
     val tempOutputPath =
       new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
 
-    if (fs.exists(tempOutputPath)) {
-      throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
-    }
     val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
 
     val fileOutputStream = if (blockSize < 0) {
@@ -240,22 +237,20 @@ private[spark] object ReliableCheckpointRDD extends Logging {
       val bufferSize = sc.conf.getInt("spark.buffer.size", 65536)
       val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName)
       val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
-      if (fs.exists(partitionerFilePath)) {
-        val fileInputStream = fs.open(partitionerFilePath, bufferSize)
-        val serializer = SparkEnv.get.serializer.newInstance()
-        val deserializeStream = serializer.deserializeStream(fileInputStream)
-        val partitioner = Utils.tryWithSafeFinally[Partitioner] {
-          deserializeStream.readObject[Partitioner]
-        } {
-          deserializeStream.close()
-        }
-        logDebug(s"Read partitioner from $partitionerFilePath")
-        Some(partitioner)
-      } else {
-        logDebug("No partitioner file")
-        None
+      val fileInputStream = fs.open(partitionerFilePath, bufferSize)
+      val serializer = SparkEnv.get.serializer.newInstance()
+      val deserializeStream = serializer.deserializeStream(fileInputStream)
+      val partitioner = Utils.tryWithSafeFinally[Partitioner] {
+        deserializeStream.readObject[Partitioner]
+      } {
+        deserializeStream.close()
       }
+      logDebug(s"Read partitioner from $partitionerFilePath")
+      Some(partitioner)
     } catch {
+      case e: FileNotFoundException =>
+        logDebug("No partitioner file", e)
+        None
       case NonFatal(e) =>
         logWarning(s"Error reading partitioner from $checkpointDirPath, " +
             s"partitioner will not be recovered which may lead to performance loss", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 74f187642af21a325d2f00d0b0e0d9c9ae14dc56..b6d723c682796bfedb54751aeb41641dc6d8bfb4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -80,12 +80,7 @@ private[spark] object ReliableRDDCheckpointData extends Logging {
   /** Clean up the files associated with the checkpoint data for this RDD. */
   def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
     checkpointPath(sc, rddId).foreach { path =>
-      val fs = path.getFileSystem(sc.hadoopConfiguration)
-      if (fs.exists(path)) {
-        if (!fs.delete(path, true)) {
-          logWarning(s"Error deleting ${path.toString()}")
-        }
-      }
+      path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
     }
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index a7d06391176d2f503972c86b64c9d4744c80142c..ce7877469f03f6908e5d5bceaae5a85250ccee20 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -91,7 +91,7 @@ private[spark] class EventLoggingListener(
    */
   def start() {
     if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
-      throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
+      throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
     }
 
     val workingPath = logPath + IN_PROGRESS
@@ -100,11 +100,8 @@ private[spark] class EventLoggingListener(
     val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
     val isDefaultLocal = defaultFs == null || defaultFs == "file"
 
-    if (shouldOverwrite && fileSystem.exists(path)) {
+    if (shouldOverwrite && fileSystem.delete(path, true)) {
       logWarning(s"Event log $path already exists. Overwriting...")
-      if (!fileSystem.delete(path, true)) {
-        logWarning(s"Error deleting $path")
-      }
     }
 
     /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
@@ -301,12 +298,6 @@ private[spark] object EventLoggingListener extends Logging {
    * @return input stream that holds one JSON record per line.
    */
   def openEventLog(log: Path, fs: FileSystem): InputStream = {
-    // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
-    // IOException when a file does not exist, so try our best to throw a proper exception.
-    if (!fs.exists(log)) {
-      throw new FileNotFoundException(s"File $log does not exist.")
-    }
-
     val in = new BufferedInputStream(fs.open(log))
 
     // Compression codec is encoded as an extension, e.g. app_123.lzf
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 2f07395edf8d10f85b9ba6c7c5c039564dd71f45..df13b32451af2d7e154c276071edaed1c8dd40e2 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.repl
 
-import java.io.{ByteArrayOutputStream, FilterInputStream, InputStream, IOException}
+import java.io.{ByteArrayOutputStream, FileNotFoundException, FilterInputStream, InputStream, IOException}
 import java.net.{HttpURLConnection, URI, URL, URLEncoder}
 import java.nio.channels.Channels
 
@@ -147,10 +147,11 @@ class ExecutorClassLoader(
   private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(
       pathInDirectory: String): InputStream = {
     val path = new Path(directory, pathInDirectory)
-    if (fileSystem.exists(path)) {
+    try {
       fileSystem.open(path)
-    } else {
-      throw new ClassNotFoundException(s"Class file not found at path $path")
+    } catch {
+      case _: FileNotFoundException =>
+        throw new ClassNotFoundException(s"Class file not found at path $path")
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 3335755fd3b677b9a2a1cc725646a4e995c01c2f..bec966b15ed0f9a7ebc7673e112893bcf4a6e606 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.io.{DataInputStream, DataOutputStream, IOException}
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -171,7 +171,7 @@ private[state] class HDFSBackedStateStoreProvider(
       if (tempDeltaFileStream != null) {
         tempDeltaFileStream.close()
       }
-      if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
+      if (tempDeltaFile != null) {
         fs.delete(tempDeltaFile, true)
       }
       logInfo("Aborted")
@@ -278,14 +278,12 @@ private[state] class HDFSBackedStateStoreProvider(
 
   /** Initialize the store provider */
   private def initialize(): Unit = {
-    if (!fs.exists(baseDir)) {
+    try {
       fs.mkdirs(baseDir)
-    } else {
-      if (!fs.isDirectory(baseDir)) {
+    } catch {
+      case e: IOException =>
         throw new IllegalStateException(
-          s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
-            s"$baseDir already exists and is not a directory")
-      }
+          s"Cannot use ${id.checkpointLocation} for storing state data for $this: $e ", e)
     }
   }
 
@@ -340,13 +338,16 @@ private[state] class HDFSBackedStateStoreProvider(
 
   private def updateFromDeltaFile(version: Long, map: MapType): Unit = {
     val fileToRead = deltaFile(version)
-    if (!fs.exists(fileToRead)) {
-      throw new IllegalStateException(
-        s"Error reading delta file $fileToRead of $this: $fileToRead does not exist")
-    }
     var input: DataInputStream = null
+    val sourceStream = try {
+      fs.open(fileToRead)
+    } catch {
+      case f: FileNotFoundException =>
+        throw new IllegalStateException(
+          s"Error reading delta file $fileToRead of $this: $fileToRead does not exist", f)
+    }
     try {
-      input = decompressStream(fs.open(fileToRead))
+      input = decompressStream(sourceStream)
       var eof = false
 
       while(!eof) {
@@ -405,8 +406,6 @@ private[state] class HDFSBackedStateStoreProvider(
 
   private def readSnapshotFile(version: Long): Option[MapType] = {
     val fileToRead = snapshotFile(version)
-    if (!fs.exists(fileToRead)) return None
-
     val map = new MapType()
     var input: DataInputStream = null
 
@@ -443,6 +442,9 @@ private[state] class HDFSBackedStateStoreProvider(
       }
       logInfo(s"Read snapshot file for version $version of $this from $fileToRead")
       Some(map)
+    } catch {
+      case _: FileNotFoundException =>
+        None
     } finally {
       if (input != null) input.close()
     }
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index e73117c8144ce72d26c345422fdf97f38d98de97..061c7431a6362b31a6a45bd858a9e330ea84da1f 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -75,9 +75,7 @@ public class JavaMetastoreDataSourcesSuite {
     hiveManagedPath = new Path(
       catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
     fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
-    if (fs.exists(hiveManagedPath)){
-      fs.delete(hiveManagedPath, true);
-    }
+    fs.delete(hiveManagedPath, true);
 
     List<String> jsonObjects = new ArrayList<>(10);
     for (int i = 0; i < 10; i++) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c36b0275f416173f8b96244a8979fc78d4dc05e4..3892fe87e2a8095ee87aca85b12653c15d525e42 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -375,7 +375,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
       val filesystemPath = new Path(expectedPath)
       val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
-      if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
+      fs.delete(filesystemPath, true)
 
       // It is a managed table when we do not specify the location.
       sql(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 398fa6500f093c605323acc2ac518febfda0ba86..5cbad8bf3ce6e9d58c585ce6a2c1095f4b8e185f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -117,7 +117,7 @@ object Checkpoint extends Logging {
 
     val path = new Path(checkpointDir)
     val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
-    if (fs.exists(path)) {
+    try {
       val statuses = fs.listStatus(path)
       if (statuses != null) {
         val paths = statuses.map(_.getPath)
@@ -127,9 +127,10 @@ object Checkpoint extends Logging {
         logWarning(s"Listing $path returned null")
         Seq.empty
       }
-    } else {
-      logWarning(s"Checkpoint directory $path does not exist")
-      Seq.empty
+    } catch {
+      case _: FileNotFoundException =>
+        logWarning(s"Checkpoint directory $path does not exist")
+        Seq.empty
     }
   }
 
@@ -229,9 +230,7 @@ class CheckpointWriter(
           logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
 
           // Write checkpoint to temp file
-          if (fs.exists(tempFile)) {
-            fs.delete(tempFile, true)   // just in case it exists
-          }
+          fs.delete(tempFile, true) // just in case it exists
           val fos = fs.create(tempFile)
           Utils.tryWithSafeFinally {
             fos.write(bytes)
@@ -242,9 +241,7 @@ class CheckpointWriter(
           // If the checkpoint file exists, back it up
           // If the backup exists as well, just delete it, otherwise rename will fail
           if (fs.exists(checkpointFile)) {
-            if (fs.exists(backupFile)) {
-              fs.delete(backupFile, true) // just in case it exists
-            }
+            fs.delete(backupFile, true) // just in case it exists
             if (!fs.rename(checkpointFile, backupFile)) {
               logWarning(s"Could not rename $checkpointFile to $backupFile")
             }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9b689f01b8d3995f1db84c54bc6977fc08bfc9cb..845f554308c431a2e2db0fb2b5c4719ed1be7121 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.streaming.util
 
+import java.io.FileNotFoundException
 import java.nio.ByteBuffer
 import java.util.{Iterator => JIterator}
 import java.util.concurrent.RejectedExecutionException
@@ -231,13 +232,25 @@ private[streaming] class FileBasedWriteAheadLog(
     val logDirectoryPath = new Path(logDirectory)
     val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
 
-    if (fileSystem.exists(logDirectoryPath) &&
-        fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
-      val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
-      pastLogs.clear()
-      pastLogs ++= logFileInfo
-      logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
-      logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+    try {
+      // If you call listStatus(file) it returns a stat of the file in the array,
+      // rather than an array listing all the children.
+      // This makes it hard to differentiate listStatus(file) and
+      // listStatus(dir-with-one-child) except by examining the name of the returned status,
+      // and once you've got symlinks in the mix that differentiation isn't easy.
+      // Checking for the path being a directory is one more call to the filesystem, but
+      // leads to much clearer code.
+      if (fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
+        val logFileInfo = logFilesTologInfo(
+          fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+        pastLogs.clear()
+        pastLogs ++= logFileInfo
+        logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+        logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+      }
+    } catch {
+      case _: FileNotFoundException =>
+        // there is no log directory, hence nothing to recover
     }
   }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index 13a765d035ee857fbc8962ee0f300825ca9f0e46..6a3b3200dccdbfecb2a05536558a28d0c38a22e7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.streaming.util
 
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
@@ -44,18 +44,16 @@ private[streaming] object HdfsUtils {
   def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
     val dfsPath = new Path(path)
     val dfs = getFileSystemForPath(dfsPath, conf)
-    if (dfs.isFile(dfsPath)) {
-      try {
-        dfs.open(dfsPath)
-      } catch {
-        case e: IOException =>
-          // If we are really unlucky, the file may be deleted as we're opening the stream.
-          // This can happen as clean up is performed by daemon threads that may be left over from
-          // previous runs.
-          if (!dfs.isFile(dfsPath)) null else throw e
-      }
-    } else {
-      null
+    try {
+      dfs.open(dfsPath)
+    } catch {
+      case _: FileNotFoundException =>
+        null
+      case e: IOException =>
+        // If we are really unlucky, the file may be deleted as we're opening the stream.
+        // This can happen as clean up is performed by daemon threads that may be left over from
+        // previous runs.
+        if (!dfs.isFile(dfsPath)) null else throw e
     }
   }
 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e3572d781b0db13822fc194f586a12e75d94838b..93684005f1cc032a008c64e68a4726d0cd9e833b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -189,9 +189,8 @@ private[spark] class Client(
     try {
       val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
       val fs = stagingDirPath.getFileSystem(hadoopConf)
-      if (!preserveFiles && fs.exists(stagingDirPath)) {
-        logInfo("Deleting staging directory " + stagingDirPath)
-        fs.delete(stagingDirPath, true)
+      if (!preserveFiles && fs.delete(stagingDirPath, true)) {
+        logInfo(s"Deleted staging directory $stagingDirPath")
       }
     } catch {
       case ioe: IOException =>