Skip to content
Snippets Groups Projects
Commit 12901643 authored by Reynold Xin's avatar Reynold Xin Committed by Matei Zaharia
Browse files

[SPARK-2704] Name threads in ConnectionManager and mark them as daemon.

handleMessageExecutor, handleReadWriteExecutor, and handleConnectExecutor are not marked as daemon and not named. I think there exists some condition in which Spark programs won't terminate because of this.

Stack dump attached in https://issues.apache.org/jira/browse/SPARK-2704

Author: Reynold Xin <rxin@apache.org>

Closes #1604 from rxin/daemon and squashes the following commits:

98d6a6c [Reynold Xin] [SPARK-2704] Name threads in ConnectionManager and mark them as daemon.
parent c183b92c
No related branches found
No related tags found
No related merge requests found
...@@ -62,13 +62,15 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, ...@@ -62,13 +62,15 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
conf.getInt("spark.core.connection.handler.threads.min", 20), conf.getInt("spark.core.connection.handler.threads.min", 20),
conf.getInt("spark.core.connection.handler.threads.max", 60), conf.getInt("spark.core.connection.handler.threads.max", 60),
conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS, conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]()) new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-message-executor"))
private val handleReadWriteExecutor = new ThreadPoolExecutor( private val handleReadWriteExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.io.threads.min", 4), conf.getInt("spark.core.connection.io.threads.min", 4),
conf.getInt("spark.core.connection.io.threads.max", 32), conf.getInt("spark.core.connection.io.threads.max", 32),
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS, conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]()) new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-read-write-executor"))
// Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
// which should be executed asap // which should be executed asap
...@@ -76,7 +78,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, ...@@ -76,7 +78,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
conf.getInt("spark.core.connection.connect.threads.min", 1), conf.getInt("spark.core.connection.connect.threads.min", 1),
conf.getInt("spark.core.connection.connect.threads.max", 8), conf.getInt("spark.core.connection.connect.threads.max", 8),
conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS, conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]()) new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-connect-executor"))
private val serverChannel = ServerSocketChannel.open() private val serverChannel = ServerSocketChannel.open()
// used to track the SendingConnections waiting to do SASL negotiation // used to track the SendingConnections waiting to do SASL negotiation
......
...@@ -21,7 +21,7 @@ import java.io._ ...@@ -21,7 +21,7 @@ import java.io._
import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection} import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection}
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.{Locale, Random, UUID} import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.Map import scala.collection.Map
...@@ -553,19 +553,19 @@ private[spark] object Utils extends Logging { ...@@ -553,19 +553,19 @@ private[spark] object Utils extends Logging {
new ThreadFactoryBuilder().setDaemon(true) new ThreadFactoryBuilder().setDaemon(true)
/** /**
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a * Create a thread factory that names threads with a prefix and also sets the threads to daemon.
* unique, sequentially assigned integer.
*/ */
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = { def namedThreadFactory(prefix: String): ThreadFactory = {
val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build() daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
} }
/** /**
* Return the string to tell how long has passed in milliseconds. * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*/ */
def getUsedTimeMs(startTimeMs: Long): String = { def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
" " + (System.currentTimeMillis - startTimeMs) + " ms" val threadFactory = namedThreadFactory(prefix)
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
} }
/** /**
...@@ -573,10 +573,17 @@ private[spark] object Utils extends Logging { ...@@ -573,10 +573,17 @@ private[spark] object Utils extends Logging {
* unique, sequentially assigned integer. * unique, sequentially assigned integer.
*/ */
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = { def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
val threadFactory = daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build() val threadFactory = namedThreadFactory(prefix)
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor] Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
} }
/**
* Return the string to tell how long has passed in milliseconds.
*/
def getUsedTimeMs(startTimeMs: Long): String = {
" " + (System.currentTimeMillis - startTimeMs) + " ms"
}
private def listFilesSafely(file: File): Seq[File] = { private def listFilesSafely(file: File): Seq[File] = {
val files = file.listFiles() val files = file.listFiles()
if (files == null) { if (files == null) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment