From 42157027f247035e9ae41efe899e27c0942f5cd8 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Mon, 24 Jun 2013 16:25:05 -0700
Subject: [PATCH] A few bug fixes and a unit test

---
 core/src/main/scala/spark/SparkContext.scala      |  1 +
 .../main/scala/spark/deploy/master/Master.scala   |  9 +++++----
 .../spark/deploy/master/ui/MasterWebUI.scala      | 10 ++++++++--
 .../main/scala/spark/deploy/worker/Worker.scala   |  9 +++------
 .../spark/deploy/worker/ui/WorkerWebUI.scala      | 10 ++++++++--
 core/src/main/scala/spark/ui/JettyUtils.scala     |  7 +++++--
 core/src/main/scala/spark/ui/SparkUI.scala        | 13 ++++++++++---
 core/src/test/resources/log4j.properties          |  1 +
 core/src/test/scala/spark/DistributedSuite.scala  |  2 ++
 core/src/test/scala/spark/ui/UISuite.scala        | 15 +++++++++++++++
 10 files changed, 58 insertions(+), 19 deletions(-)
 create mode 100644 core/src/test/scala/spark/ui/UISuite.scala

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1f420d73f7..500d25efdd 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -578,6 +578,7 @@ class SparkContext(
 
   /** Shut down the SparkContext. */
   def stop() {
+    ui.stop()
     // Do this only if not stopped already - best case effort.
     // prevent NPE if stopped more than once.
     val dagSchedulerCopy = dagScheduler
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 3eca522c7e..2a49dfb486 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -36,6 +36,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
   var firstApp: Option[ApplicationInfo] = None
 
+  val webUi = new MasterWebUI(self)
+
   Utils.checkHost(host, "Expected hostname")
 
   val masterPublicAddress = {
@@ -52,13 +54,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     logInfo("Starting Spark master at spark://" + host + ":" + port)
     // Listen for remote client disconnection events, since they don't go through Akka's watch()
     context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
-    startWebUi()
+    webUi.start()
     context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
   }
 
-  def startWebUi() {
-    val webUi = new MasterWebUI(self)
-    webUi.start()
+  override def postStop() {
+    webUi.stop()
   }
 
   override def receive = {
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
index 5fd17f10c6..065b62a4b2 100644
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -5,7 +5,7 @@ import akka.util.Duration
 
 import javax.servlet.http.HttpServletRequest
 
-import org.eclipse.jetty.server.Handler
+import org.eclipse.jetty.server.{Handler, Server}
 
 import spark.{Logging, Utils}
 import spark.ui.JettyUtils
@@ -21,13 +21,15 @@ class MasterWebUI(val master: ActorRef) extends Logging {
   val host = Utils.localHostName()
   val port = Option(System.getProperty("master.ui.port"))
     .getOrElse(MasterWebUI.DEFAULT_PORT).toInt
+  var server: Option[Server] = None
 
   val applicationPage = new ApplicationPage(this)
   val indexPage = new IndexPage(this)
 
   def start() {
     try {
-      val (server, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+      val (srv, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+      server = Some(srv)
       logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
     } catch {
       case e: Exception =>
@@ -42,6 +44,10 @@ class MasterWebUI(val master: ActorRef) extends Logging {
     ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
     ("*", (request: HttpServletRequest) => indexPage.render(request))
   )
+
+  def stop() {
+    server.foreach(_.stop())
+  }
 }
 
 object MasterWebUI {
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 690bdfe128..28553b6c02 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -45,6 +45,7 @@ private[spark] class Worker(
     val envVar = System.getenv("SPARK_PUBLIC_DNS")
     if (envVar != null) envVar else host
   }
+  val webUi = new WorkerWebUI(self, workDir)
 
   var coresUsed = 0
   var memoryUsed = 0
@@ -77,7 +78,7 @@ private[spark] class Worker(
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     connectToMaster()
-    startWebUi()
+    webUi.start()
   }
 
   def connectToMaster() {
@@ -88,11 +89,6 @@ private[spark] class Worker(
     context.watch(master) // Doesn't work with remote actors, but useful for testing
   }
 
-  def startWebUi() {
-    val webUi = new WorkerWebUI(self, workDir)
-    webUi.start()
-  }
-
   override def receive = {
     case RegisteredWorker(url) =>
       masterWebUiUrl = url
@@ -163,6 +159,7 @@ private[spark] class Worker(
 
   override def postStop() {
     executors.values.foreach(_.kill())
+    webUi.stop()
   }
 }
 
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index abfc847527..ee3889192d 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -7,7 +7,7 @@ import java.io.File
 
 import javax.servlet.http.HttpServletRequest
 
-import org.eclipse.jetty.server.Handler
+import org.eclipse.jetty.server.{Handler, Server}
 
 import scala.io.Source
 
@@ -25,6 +25,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
   val host = Utils.localHostName()
   val port = Option(System.getProperty("wroker.ui.port"))
     .getOrElse(WorkerWebUI.DEFAULT_PORT).toInt
+  var server: Option[Server] = None
 
   val indexPage = new IndexPage(this)
 
@@ -37,7 +38,8 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
 
   def start() {
     try {
-      val (server, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+      val (srv, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+      server = Some(srv)
       logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
     } catch {
       case e: Exception =>
@@ -56,6 +58,10 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
     source.close()
     lines
   }
+
+  def stop() {
+    server.foreach(_.stop())
+  }
 }
 
 object WorkerWebUI {
diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala
index b5270e6062..85d6a7e867 100644
--- a/core/src/main/scala/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/spark/ui/JettyUtils.scala
@@ -12,7 +12,8 @@ import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHa
 import scala.util.{Try, Success, Failure}
 import scala.xml.Node
 
-import spark.{Utils, SparkContext, Logging}
+import spark.{SparkContext, Logging}
+import org.eclipse.jetty.util.log.Log
 
 /** Utilities for launching a web server using Jetty's HTTP Server class */
 private[spark] object JettyUtils extends Logging {
@@ -91,12 +92,14 @@ private[spark] object JettyUtils extends Logging {
 
     @tailrec
     def connect(currentPort: Int): (Server, Int) = {
-      val server = new Server(port)
+      val server = new Server(currentPort)
       server.setHandler(handlerList)
       Try { server.start() } match {
         case s: Success[_] => (server, currentPort)
         case f: Failure[_] =>
+          server.stop()
           logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
+          logInfo("Error was: " + f.toString)
           connect((currentPort + 1) % 65536)
       }
     }
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 2d5a328015..487f005615 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -2,7 +2,7 @@ package spark.ui
 
 import javax.servlet.http.HttpServletRequest
 
-import org.eclipse.jetty.server.Handler
+import org.eclipse.jetty.server.{Handler, Server}
 
 import spark.{Logging, SparkContext, Utils}
 import spark.ui.storage.BlockManagerUI
@@ -17,6 +17,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
   val host = Utils.localHostName()
   val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt
   var boundPort: Option[Int] = None
+  var server: Option[Server] = None
 
   val handlers = Seq[(String, Handler)](
     ("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)),
@@ -26,11 +27,12 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
   val jobs = new JobProgressUI(sc)
   val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ handlers
 
+  /** Bind the HTTP server which backs this web interface */
   def bind() {
-    /** Start an HTTP server to run the Web interface */
     try {
-      val (server, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers)
+      val (srv, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers)
       logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort))
+      server = Some(srv)
       boundPort = Some(usedPort)
     } catch {
     case e: Exception =>
@@ -38,6 +40,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
       System.exit(1)
     }
   }
+
   /** Initialize all components of the server */
   def start() {
     // NOTE: This is decoupled from bind() because of the following dependency cycle:
@@ -47,6 +50,10 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
     jobs.start()
   }
 
+  def stop() {
+    server.foreach(_.stop())
+  }
+
   private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
 }
 
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index 6ec89c0184..d05cf3dec1 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -8,3 +8,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}:
 
 # Ignore messages below warning level from Jetty, because it's a bit verbose
 log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 0866fb47b3..0024ede828 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -10,6 +10,7 @@ import org.scalatest.time.{Span, Millis}
 import org.scalacheck.Arbitrary._
 import org.scalacheck.Gen
 import org.scalacheck.Prop._
+import org.eclipse.jetty.server.{Server, Request, Handler}
 
 import com.google.common.io.Files
 
@@ -17,6 +18,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import SparkContext._
 import storage.{GetBlock, BlockManagerWorker, StorageLevel}
+import ui.JettyUtils
 
 
 class NotSerializableClass
diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala
new file mode 100644
index 0000000000..6766b158f6
--- /dev/null
+++ b/core/src/test/scala/spark/ui/UISuite.scala
@@ -0,0 +1,15 @@
+package spark.ui
+
+import org.scalatest.FunSuite
+import org.eclipse.jetty.server.Server
+
+class UISuite extends FunSuite {
+  test("jetty port increases under contention") {
+    val startPort = 33333
+    val server = new Server(startPort)
+    server.start()
+    val (jettyServer, boundPort) = JettyUtils.startJettyServer("localhost", startPort, Seq())
+    assert(boundPort === startPort + 1)
+  }
+
+}
-- 
GitLab