Skip to content
Snippets Groups Projects
Commit dd681f50 authored by Mridul Muralidharan's avatar Mridul Muralidharan Committed by Aaron Davidson
Browse files

SPARK-1587 Fix thread leak

mvn test fails (intermittently) due to thread leak - since scalatest runs all tests in same vm.

Author: Mridul Muralidharan <mridulm80@apache.org>

Closes #504 from mridulm/resource_leak_fixes and squashes the following commits:

a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : cleanup all threads when SparkContext.stop is invoked. Causes tests to fail
7b5e19c [Mridul Muralidharan] Prevent NPE while running tests
parent bb68f477
No related branches found
No related tags found
No related merge requests found
...@@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String, ...@@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String,
sinkConfigs.foreach { kv => sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class") val classPath = kv._2.getProperty("class")
try { if (null != classPath) {
val sink = Class.forName(classPath) try {
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) val sink = Class.forName(classPath)
.newInstance(kv._2, registry, securityMgr) .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
if (kv._1 == "servlet") { .newInstance(kv._2, registry, securityMgr)
metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) if (kv._1 == "servlet") {
} else { metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
sinks += sink.asInstanceOf[Sink] } else {
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
} }
} catch {
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
} }
} }
} }
......
...@@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl( ...@@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl(
if (taskResultGetter != null) { if (taskResultGetter != null) {
taskResultGetter.stop() taskResultGetter.stop()
} }
starvationTimer.cancel()
// sleeping for an arbitrary 1 seconds to ensure that messages are sent out. // sleeping for an arbitrary 1 seconds to ensure that messages are sent out.
Thread.sleep(1000L) Thread.sleep(1000L)
......
...@@ -1021,6 +1021,8 @@ private[spark] class BlockManager( ...@@ -1021,6 +1021,8 @@ private[spark] class BlockManager(
heartBeatTask.cancel() heartBeatTask.cancel()
} }
connectionManager.stop() connectionManager.stop()
shuffleBlockManager.stop()
diskBlockManager.stop()
actorSystem.stop(slaveActor) actorSystem.stop(slaveActor)
blockInfo.clear() blockInfo.clear()
memoryStore.clear() memoryStore.clear()
......
...@@ -150,20 +150,26 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD ...@@ -150,20 +150,26 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run() { override def run() {
logDebug("Shutdown hook called") logDebug("Shutdown hook called")
localDirs.foreach { localDir => stop()
try { }
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) })
} catch { }
case t: Throwable =>
logError("Exception while deleting local spark dir: " + localDir, t)
}
}
if (shuffleSender != null) { private[spark] def stop() {
shuffleSender.stop() localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
} catch {
case t: Throwable =>
logError("Exception while deleting local spark dir: " + localDir, t)
} }
} }
}) }
if (shuffleSender != null) {
shuffleSender.stop()
}
} }
private[storage] def startShuffleBlockSender(port: Int): Int = { private[storage] def startShuffleBlockSender(port: Int): Int = {
......
...@@ -207,6 +207,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { ...@@ -207,6 +207,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
private def cleanup(cleanupTime: Long) { private def cleanup(cleanupTime: Long) {
shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId)) shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
} }
def stop() {
metadataCleaner.cancel()
}
} }
private[spark] private[spark]
......
...@@ -195,6 +195,7 @@ private[spark] object JettyUtils extends Logging { ...@@ -195,6 +195,7 @@ private[spark] object JettyUtils extends Logging {
(server, server.getConnectors.head.getLocalPort) (server, server.getConnectors.head.getLocalPort)
case f: Failure[_] => case f: Failure[_] =>
server.stop() server.stop()
pool.stop()
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort)) logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
logInfo("Error was: " + f.toString) logInfo("Error was: " + f.toString)
connect((currentPort + 1) % 65536) connect((currentPort + 1) % 65536)
......
...@@ -53,6 +53,11 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { ...@@ -53,6 +53,11 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
shuffleBlockManager.idToSegmentMap.clear() shuffleBlockManager.idToSegmentMap.clear()
} }
override def afterEach() {
diskBlockManager.stop()
shuffleBlockManager.idToSegmentMap.clear()
}
test("basic block creation") { test("basic block creation") {
val blockId = new TestBlockId("test") val blockId = new TestBlockId("test")
assertSegmentEquals(blockId, blockId.name, 0, 0) assertSegmentEquals(blockId, blockId.name, 0, 0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment