diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index e48817ebbafddb2d96d1d2799408e226f359d5bb..00b9d1af373dbfc6a495b20825811c67611084be 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -62,8 +62,8 @@ private[deploy] class Worker(
   private val forwordMessageScheduler =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
 
-  // A separated thread to clean up the workDir. Used to provide the implicit parameter of `Future`
-  // methods.
+  // A separated thread to clean up the workDir and the directories of finished applications.
+  // Used to provide the implicit parameter of `Future` methods.
   private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
     ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
 
@@ -578,10 +578,15 @@ private[deploy] class Worker(
     if (shouldCleanup) {
       finishedApps -= id
       appDirectories.remove(id).foreach { dirList =>
-        logInfo(s"Cleaning up local directories for application $id")
-        dirList.foreach { dir =>
-          Utils.deleteRecursively(new File(dir))
-        }
+        concurrent.Future {
+          logInfo(s"Cleaning up local directories for application $id")
+          dirList.foreach { dir =>
+            Utils.deleteRecursively(new File(dir))
+          }
+        }(cleanupThreadExecutor).onFailure {
+          case e: Throwable =>
+            logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
+        }(cleanupThreadExecutor)
       }
       shuffleService.applicationRemoved(id)
     }