diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 31312fb064b1597d50a0fa9810167bfc209f318a..d9fecc5e3011e448aaf0fcc9688f4845e1f6cc74 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import scala.xml.Node -import org.eclipse.jetty.server.{Connector, Request, Server} +import org.eclipse.jetty.server.{AbstractConnector, Connector, Request, Server} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.server.nio.SelectChannelConnector import org.eclipse.jetty.server.ssl.SslSelectChannelConnector @@ -271,9 +271,24 @@ private[spark] object JettyUtils extends Logging { gzipHandlers.foreach(collection.addHandler) connectors.foreach(_.setHost(hostName)) + // As each acceptor and each selector will use one thread, the number of threads should at + // least be the number of acceptors and selectors plus 1. (See SPARK-13776) + var minThreads = 1 + connectors.foreach { c => + // Currently we only use "SelectChannelConnector" + val connector = c.asInstanceOf[SelectChannelConnector] + // Limit the max acceptor number to 8 so that we don't waste a lot of threads + connector.setAcceptors(math.min(connector.getAcceptors, 8)) + // The number of selectors always equals to the number of acceptors + minThreads += connector.getAcceptors * 2 + } server.setConnectors(connectors.toArray) val pool = new QueuedThreadPool + if (serverName.nonEmpty) { + pool.setName(serverName) + } + pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads)) pool.setDaemon(true) server.setThreadPool(pool) val errorHandler = new ErrorHandler()