Skip to content
Snippets Groups Projects
Commit 42157027 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

A few bug fixes and a unit test

parent a4248138
No related branches found
No related tags found
No related merge requests found
...@@ -578,6 +578,7 @@ class SparkContext( ...@@ -578,6 +578,7 @@ class SparkContext(
/** Shut down the SparkContext. */ /** Shut down the SparkContext. */
def stop() { def stop() {
ui.stop()
// Do this only if not stopped already - best case effort. // Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once. // prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler val dagSchedulerCopy = dagScheduler
......
...@@ -36,6 +36,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -36,6 +36,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
var firstApp: Option[ApplicationInfo] = None var firstApp: Option[ApplicationInfo] = None
val webUi = new MasterWebUI(self)
Utils.checkHost(host, "Expected hostname") Utils.checkHost(host, "Expected hostname")
val masterPublicAddress = { val masterPublicAddress = {
...@@ -52,13 +54,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -52,13 +54,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
logInfo("Starting Spark master at spark://" + host + ":" + port) logInfo("Starting Spark master at spark://" + host + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch() // Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
startWebUi() webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
} }
def startWebUi() { override def postStop() {
val webUi = new MasterWebUI(self) webUi.stop()
webUi.start()
} }
override def receive = { override def receive = {
......
...@@ -5,7 +5,7 @@ import akka.util.Duration ...@@ -5,7 +5,7 @@ import akka.util.Duration
import javax.servlet.http.HttpServletRequest import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, Utils} import spark.{Logging, Utils}
import spark.ui.JettyUtils import spark.ui.JettyUtils
...@@ -21,13 +21,15 @@ class MasterWebUI(val master: ActorRef) extends Logging { ...@@ -21,13 +21,15 @@ class MasterWebUI(val master: ActorRef) extends Logging {
val host = Utils.localHostName() val host = Utils.localHostName()
val port = Option(System.getProperty("master.ui.port")) val port = Option(System.getProperty("master.ui.port"))
.getOrElse(MasterWebUI.DEFAULT_PORT).toInt .getOrElse(MasterWebUI.DEFAULT_PORT).toInt
var server: Option[Server] = None
val applicationPage = new ApplicationPage(this) val applicationPage = new ApplicationPage(this)
val indexPage = new IndexPage(this) val indexPage = new IndexPage(this)
def start() { def start() {
try { try {
val (server, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) val (srv, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
server = Some(srv)
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
} catch { } catch {
case e: Exception => case e: Exception =>
...@@ -42,6 +44,10 @@ class MasterWebUI(val master: ActorRef) extends Logging { ...@@ -42,6 +44,10 @@ class MasterWebUI(val master: ActorRef) extends Logging {
("/app", (request: HttpServletRequest) => applicationPage.render(request)), ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
("*", (request: HttpServletRequest) => indexPage.render(request)) ("*", (request: HttpServletRequest) => indexPage.render(request))
) )
def stop() {
server.foreach(_.stop())
}
} }
object MasterWebUI { object MasterWebUI {
......
...@@ -45,6 +45,7 @@ private[spark] class Worker( ...@@ -45,6 +45,7 @@ private[spark] class Worker(
val envVar = System.getenv("SPARK_PUBLIC_DNS") val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host if (envVar != null) envVar else host
} }
val webUi = new WorkerWebUI(self, workDir)
var coresUsed = 0 var coresUsed = 0
var memoryUsed = 0 var memoryUsed = 0
...@@ -77,7 +78,7 @@ private[spark] class Worker( ...@@ -77,7 +78,7 @@ private[spark] class Worker(
logInfo("Spark home: " + sparkHome) logInfo("Spark home: " + sparkHome)
createWorkDir() createWorkDir()
connectToMaster() connectToMaster()
startWebUi() webUi.start()
} }
def connectToMaster() { def connectToMaster() {
...@@ -88,11 +89,6 @@ private[spark] class Worker( ...@@ -88,11 +89,6 @@ private[spark] class Worker(
context.watch(master) // Doesn't work with remote actors, but useful for testing context.watch(master) // Doesn't work with remote actors, but useful for testing
} }
def startWebUi() {
val webUi = new WorkerWebUI(self, workDir)
webUi.start()
}
override def receive = { override def receive = {
case RegisteredWorker(url) => case RegisteredWorker(url) =>
masterWebUiUrl = url masterWebUiUrl = url
...@@ -163,6 +159,7 @@ private[spark] class Worker( ...@@ -163,6 +159,7 @@ private[spark] class Worker(
override def postStop() { override def postStop() {
executors.values.foreach(_.kill()) executors.values.foreach(_.kill())
webUi.stop()
} }
} }
......
...@@ -7,7 +7,7 @@ import java.io.File ...@@ -7,7 +7,7 @@ import java.io.File
import javax.servlet.http.HttpServletRequest import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler import org.eclipse.jetty.server.{Handler, Server}
import scala.io.Source import scala.io.Source
...@@ -25,6 +25,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging { ...@@ -25,6 +25,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
val host = Utils.localHostName() val host = Utils.localHostName()
val port = Option(System.getProperty("wroker.ui.port")) val port = Option(System.getProperty("wroker.ui.port"))
.getOrElse(WorkerWebUI.DEFAULT_PORT).toInt .getOrElse(WorkerWebUI.DEFAULT_PORT).toInt
var server: Option[Server] = None
val indexPage = new IndexPage(this) val indexPage = new IndexPage(this)
...@@ -37,7 +38,8 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging { ...@@ -37,7 +38,8 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
def start() { def start() {
try { try {
val (server, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) val (srv, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
server = Some(srv)
logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
} catch { } catch {
case e: Exception => case e: Exception =>
...@@ -56,6 +58,10 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging { ...@@ -56,6 +58,10 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
source.close() source.close()
lines lines
} }
def stop() {
server.foreach(_.stop())
}
} }
object WorkerWebUI { object WorkerWebUI {
......
...@@ -12,7 +12,8 @@ import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHa ...@@ -12,7 +12,8 @@ import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHa
import scala.util.{Try, Success, Failure} import scala.util.{Try, Success, Failure}
import scala.xml.Node import scala.xml.Node
import spark.{Utils, SparkContext, Logging} import spark.{SparkContext, Logging}
import org.eclipse.jetty.util.log.Log
/** Utilities for launching a web server using Jetty's HTTP Server class */ /** Utilities for launching a web server using Jetty's HTTP Server class */
private[spark] object JettyUtils extends Logging { private[spark] object JettyUtils extends Logging {
...@@ -91,12 +92,14 @@ private[spark] object JettyUtils extends Logging { ...@@ -91,12 +92,14 @@ private[spark] object JettyUtils extends Logging {
@tailrec @tailrec
def connect(currentPort: Int): (Server, Int) = { def connect(currentPort: Int): (Server, Int) = {
val server = new Server(port) val server = new Server(currentPort)
server.setHandler(handlerList) server.setHandler(handlerList)
Try { server.start() } match { Try { server.start() } match {
case s: Success[_] => (server, currentPort) case s: Success[_] => (server, currentPort)
case f: Failure[_] => case f: Failure[_] =>
server.stop()
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort)) logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
logInfo("Error was: " + f.toString)
connect((currentPort + 1) % 65536) connect((currentPort + 1) % 65536)
} }
} }
......
...@@ -2,7 +2,7 @@ package spark.ui ...@@ -2,7 +2,7 @@ package spark.ui
import javax.servlet.http.HttpServletRequest import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, SparkContext, Utils} import spark.{Logging, SparkContext, Utils}
import spark.ui.storage.BlockManagerUI import spark.ui.storage.BlockManagerUI
...@@ -17,6 +17,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { ...@@ -17,6 +17,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
val host = Utils.localHostName() val host = Utils.localHostName()
val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt
var boundPort: Option[Int] = None var boundPort: Option[Int] = None
var server: Option[Server] = None
val handlers = Seq[(String, Handler)]( val handlers = Seq[(String, Handler)](
("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)), ("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)),
...@@ -26,11 +27,12 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { ...@@ -26,11 +27,12 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
val jobs = new JobProgressUI(sc) val jobs = new JobProgressUI(sc)
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ handlers val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ handlers
/** Bind the HTTP server which backs this web interface */
def bind() { def bind() {
/** Start an HTTP server to run the Web interface */
try { try {
val (server, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers) val (srv, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers)
logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort)) logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort))
server = Some(srv)
boundPort = Some(usedPort) boundPort = Some(usedPort)
} catch { } catch {
case e: Exception => case e: Exception =>
...@@ -38,6 +40,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { ...@@ -38,6 +40,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
System.exit(1) System.exit(1)
} }
} }
/** Initialize all components of the server */ /** Initialize all components of the server */
def start() { def start() {
// NOTE: This is decoupled from bind() because of the following dependency cycle: // NOTE: This is decoupled from bind() because of the following dependency cycle:
...@@ -47,6 +50,10 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { ...@@ -47,6 +50,10 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
jobs.start() jobs.start()
} }
def stop() {
server.foreach(_.stop())
}
private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1") private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
} }
......
...@@ -8,3 +8,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: ...@@ -8,3 +8,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}:
# Ignore messages below warning level from Jetty, because it's a bit verbose # Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty=WARN
org.eclipse.jetty.LEVEL=WARN
...@@ -10,6 +10,7 @@ import org.scalatest.time.{Span, Millis} ...@@ -10,6 +10,7 @@ import org.scalatest.time.{Span, Millis}
import org.scalacheck.Arbitrary._ import org.scalacheck.Arbitrary._
import org.scalacheck.Gen import org.scalacheck.Gen
import org.scalacheck.Prop._ import org.scalacheck.Prop._
import org.eclipse.jetty.server.{Server, Request, Handler}
import com.google.common.io.Files import com.google.common.io.Files
...@@ -17,6 +18,7 @@ import scala.collection.mutable.ArrayBuffer ...@@ -17,6 +18,7 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._ import SparkContext._
import storage.{GetBlock, BlockManagerWorker, StorageLevel} import storage.{GetBlock, BlockManagerWorker, StorageLevel}
import ui.JettyUtils
class NotSerializableClass class NotSerializableClass
......
package spark.ui
import org.scalatest.FunSuite
import org.eclipse.jetty.server.Server
class UISuite extends FunSuite {
test("jetty port increases under contention") {
val startPort = 33333
val server = new Server(startPort)
server.start()
val (jettyServer, boundPort) = JettyUtils.startJettyServer("localhost", startPort, Seq())
assert(boundPort === startPort + 1)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment