diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index c5bda2078fc149aa4012d82f5f4b40a3d8be1f92..651511da1b7feb067ac133a01077d6ceeb2952ba 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String,
 
     sinkConfigs.foreach { kv =>
       val classPath = kv._2.getProperty("class")
-      try {
-        val sink = Class.forName(classPath)
-          .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
-          .newInstance(kv._2, registry, securityMgr)
-        if (kv._1 == "servlet") {
-           metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
-        } else {
-          sinks += sink.asInstanceOf[Sink]
+      if (null != classPath) {
+        try {
+          val sink = Class.forName(classPath)
+            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
+            .newInstance(kv._2, registry, securityMgr)
+          if (kv._1 == "servlet") {
+            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+          } else {
+            sinks += sink.asInstanceOf[Sink]
+          }
+        } catch {
+          case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
         }
-      } catch {
-        case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
       }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index be19d9b8854c85daa6f3034278912d72f5aa80c9..5a68f38bc5844f941788c8382fdd00048646ed8a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl(
     if (taskResultGetter != null) {
       taskResultGetter.stop()
     }
+    starvationTimer.cancel()
 
     // sleeping for an arbitrary 1 seconds to ensure that messages are sent out.
     Thread.sleep(1000L)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f15fa4dd7ffd5f36edb1412d8820863cb30c2976..ccd5c5320abe5fbad59d0a05c5867f8559f62433 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1021,6 +1021,8 @@ private[spark] class BlockManager(
       heartBeatTask.cancel()
     }
     connectionManager.stop()
+    shuffleBlockManager.stop()
+    diskBlockManager.stop()
     actorSystem.stop(slaveActor)
     blockInfo.clear()
     memoryStore.clear()
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 7a24c8f57f43b7cc4e7092245354dc72f3d3f0db..054f66a8b726027dc0c8f459bf7f89b4fad64f24 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -150,20 +150,26 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
       override def run() {
         logDebug("Shutdown hook called")
-        localDirs.foreach { localDir =>
-          try {
-            if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
-          } catch {
-            case t: Throwable =>
-              logError("Exception while deleting local spark dir: " + localDir, t)
-          }
-        }
+        stop()
+      }
+    })
+  }
 
-        if (shuffleSender != null) {
-          shuffleSender.stop()
+  private[spark] def stop() {
+    localDirs.foreach { localDir =>
+      if (localDir.isDirectory() && localDir.exists()) {
+        try {
+          if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
+        } catch {
+          case t: Throwable =>
+            logError("Exception while deleting local spark dir: " + localDir, t)
         }
       }
-    })
+    }
+
+    if (shuffleSender != null) {
+      shuffleSender.stop()
+    }
   }
 
   private[storage] def startShuffleBlockSender(port: Int): Int = {
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 4cd4cdbd9909db2c6c644c1879c4620e46ade198..35910e552fe86d4b8ce6c0a6c14765441669b286 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -207,6 +207,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   private def cleanup(cleanupTime: Long) {
     shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
   }
+
+  def stop() {
+    metadataCleaner.cancel()
+  }
 }
 
 private[spark]
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 750f5a501c213fccbe5155ec398815c861fafa13..fdeb15b5d058a52dfeb467ecde43d6ae01064906 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -195,6 +195,7 @@ private[spark] object JettyUtils extends Logging {
           (server, server.getConnectors.head.getLocalPort)
         case f: Failure[_] =>
           server.stop()
+          pool.stop()
           logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
           logInfo("Error was: " + f.toString)
           connect((currentPort + 1) % 65536)
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 9b29e2a8a55deaae4d4fdf9e9328de9d8daab6c5..42bfbf1bdfc742d05d5bf00f1f49e0d419eaee4a 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -53,6 +53,11 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
     shuffleBlockManager.idToSegmentMap.clear()
   }
 
+  override def afterEach() {
+    diskBlockManager.stop()
+    shuffleBlockManager.idToSegmentMap.clear()
+  }
+
   test("basic block creation") {
     val blockId = new TestBlockId("test")
     assertSegmentEquals(blockId, blockId.name, 0, 0)