From f03d9760fd8ac67fd0865cb355ba75d2eff507fe Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Sun, 27 Jan 2013 23:56:14 -0800
Subject: [PATCH] Clean up BlockManagerUI a little (make it not be an object,
 merge with Directives, and bind to a random port)

---
 core/src/main/scala/spark/SparkContext.scala  |   7 +-
 core/src/main/scala/spark/Utils.scala         |  17 ++-
 .../spark/deploy/master/MasterWebUI.scala     |   6 +-
 .../spark/deploy/worker/WorkerWebUI.scala     |   6 +-
 .../scala/spark/storage/BlockManagerUI.scala  | 120 +++++++++---------
 .../src/main/scala/spark/util/AkkaUtils.scala |   6 +-
 .../scala/spark/util/MetadataCleaner.scala    |   3 +
 7 files changed, 91 insertions(+), 74 deletions(-)

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 39721b47ae..77036c1275 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -44,6 +44,7 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
 import spark.scheduler.local.LocalScheduler
 import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
 import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import storage.BlockManagerUI
 import util.{MetadataCleaner, TimeStampedHashMap}
 
 /**
@@ -88,8 +89,9 @@ class SparkContext(
   SparkEnv.set(env)
 
   // Start the BlockManager UI
-  spark.storage.BlockManagerUI.start(SparkEnv.get.actorSystem, 
-    SparkEnv.get.blockManager.master.masterActor, this)
+  private[spark] val ui = new BlockManagerUI(
+    env.actorSystem, env.blockManager.master.masterActor, this)
+  ui.start()
 
   // Used to store a URL for each static file/jar together with the file's local timestamp
   private[spark] val addedFiles = HashMap[String, Long]()
@@ -97,7 +99,6 @@ class SparkContext(
 
   // Keeps track of all persisted RDDs
   private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()
-
   private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
 
 
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index ae77264372..1e58d01273 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -1,7 +1,7 @@
 package spark
 
 import java.io._
-import java.net.{NetworkInterface, InetAddress, Inet4Address, URL, URI}
+import java.net._
 import java.util.{Locale, Random, UUID}
 import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
 import org.apache.hadoop.conf.Configuration
@@ -11,6 +11,7 @@ import scala.collection.JavaConversions._
 import scala.io.Source
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
+import scala.Some
 
 /**
  * Various utility methods used by Spark.
@@ -431,4 +432,18 @@ private object Utils extends Logging {
     }
     "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
   }
+
+  /**
+   * Try to find a free port to bind to on the local host. This should ideally never be needed,
+   * except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray)
+   * don't let users bind to port 0 and then figure out which free port they actually bound to.
+   * We work around this by binding a ServerSocket and immediately unbinding it. This is *not*
+   * necessarily guaranteed to work, but it's the best we can do.
+   */
+  def findFreePort(): Int = {
+    val socket = new ServerSocket(0)
+    val portBound = socket.getLocalPort
+    socket.close()
+    portBound
+  }
 }
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 458ee2d665..a01774f511 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -14,12 +14,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
 import spark.deploy._
 import spark.deploy.JsonProtocol._
 
+/**
+ * Web UI server for the standalone master.
+ */
 private[spark]
 class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
   val RESOURCE_DIR = "spark/deploy/master/webui"
   val STATIC_RESOURCE_DIR = "spark/deploy/static"
   
-  implicit val timeout = Timeout(1 seconds)
+  implicit val timeout = Timeout(10 seconds)
   
   val handler = {
     get {
@@ -76,5 +79,4 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
       getFromResourceDirectory(RESOURCE_DIR)
     }
   }
-
 }
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index f9489d99fc..ef81f072a3 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -13,12 +13,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
 import spark.deploy.{WorkerState, RequestWorkerState}
 import spark.deploy.JsonProtocol._
 
+/**
+ * Web UI server for the standalone worker.
+ */
 private[spark]
 class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
   val RESOURCE_DIR = "spark/deploy/worker/webui"
   val STATIC_RESOURCE_DIR = "spark/deploy/static"
   
-  implicit val timeout = Timeout(1 seconds)
+  implicit val timeout = Timeout(10 seconds)
   
   val handler = {
     get {
@@ -50,5 +53,4 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
       getFromResourceDirectory(RESOURCE_DIR)
     }
   }
-  
 }
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
index 956ede201e..eda320fa47 100644
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -1,32 +1,41 @@
 package spark.storage
 
 import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Await
 import akka.pattern.ask
 import akka.util.Timeout
 import akka.util.duration._
-import cc.spray.Directives
 import cc.spray.directives._
 import cc.spray.typeconversion.TwirlSupport._
+import cc.spray.Directives
 import scala.collection.mutable.ArrayBuffer
-import spark.{Logging, SparkContext, SparkEnv}
+import spark.{Logging, SparkContext}
 import spark.util.AkkaUtils
 import spark.Utils
 
 
+/**
+ * Web UI server for the BlockManager inside each SparkContext.
+ */
 private[spark]
-object BlockManagerUI extends Logging {
+class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
+  extends Directives with Logging {
+
+  val STATIC_RESOURCE_DIR = "spark/deploy/static"
+
+  implicit val timeout = Timeout(10 seconds)
 
-  /* Starts the Web interface for the BlockManager */
-  def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) {
-    val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc)
+  /** Start a HTTP server to run the Web interface */
+  def start() {
     try {
-      // TODO: This needs to find a random free port to bind to. Unfortunately, there's no way
-      // in spray to do that, so we'll have to rely on something like new ServerSocket()
-      val boundPort = AkkaUtils.startSprayServer(actorSystem, "0.0.0.0",
-        Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt,
-        webUIDirectives.handler, "BlockManagerHTTPServer")
-      logInfo("Started BlockManager web UI at %s:%d".format(Utils.localHostName(), boundPort))
+      val port = if (System.getProperty("spark.ui.port") != null) {
+        System.getProperty("spark.ui.port").toInt
+      } else {
+        // TODO: Unfortunately, it's not possible to pass port 0 to spray and figure out which
+        // random port it bound to, so we have to try to find a local one by creating a socket.
+        Utils.findFreePort()
+      }
+      AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer")
+      logInfo("Started BlockManager web UI at http://%s:%d".format(Utils.localHostName(), port))
     } catch {
       case e: Exception =>
         logError("Failed to create BlockManager WebUI", e)
@@ -34,58 +43,43 @@ object BlockManagerUI extends Logging {
     }
   }
 
-}
-
-
-private[spark]
-class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, 
-  sc: SparkContext) extends Directives {  
-
-  val STATIC_RESOURCE_DIR = "spark/deploy/static"
-  implicit val timeout = Timeout(1 seconds)
-
   val handler = {
-    
-    get { path("") { completeWith {
-      // Request the current storage status from the Master
-      val future = master ? GetStorageStatus
-      future.map { status =>
-        val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
-        
-        // Calculate macro-level statistics
-        val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
-        val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
-        val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
-          .reduceOption(_+_).getOrElse(0L)
-
-        val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
-
-        spark.storage.html.index.
-          render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
-      }
-    }}} ~
-    get { path("rdd") { parameter("id") { id => { completeWith {
-      val future = master ? GetStorageStatus
-      future.map { status =>
-        val prefix = "rdd_" + id.toString
-
-
-        val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
-        val filteredStorageStatusList = StorageUtils.
-          filterStorageStatusByPrefix(storageStatusList, prefix)
-
-        val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
-
-        spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
-
+    get {
+      path("") {
+        completeWith {
+          // Request the current storage status from the Master
+          val future = blockManagerMaster ? GetStorageStatus
+          future.map { status =>
+            // Calculate macro-level statistics
+            val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
+            val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+            val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+            val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+              .reduceOption(_+_).getOrElse(0L)
+            val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+            spark.storage.html.index.
+              render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
+          }
+        }
+      } ~
+      path("rdd") {
+        parameter("id") { id =>
+          completeWith {
+            val future = blockManagerMaster ? GetStorageStatus
+            future.map { status =>
+              val prefix = "rdd_" + id.toString
+              val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
+              val filteredStorageStatusList = StorageUtils.
+                filterStorageStatusByPrefix(storageStatusList, prefix)
+              val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+              spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
+            }
+          }
+        }
+      } ~
+      pathPrefix("static") {
+        getFromResourceDirectory(STATIC_RESOURCE_DIR)
       }
-    }}}}} ~
-    pathPrefix("static") {
-      getFromResourceDirectory(STATIC_RESOURCE_DIR)
     }
-
   }
-
-  
-
 }
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index 775ff8f1aa..e0fdeffbc4 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -1,6 +1,6 @@
 package spark.util
 
-import akka.actor.{Props, ActorSystemImpl, ActorSystem}
+import akka.actor.{ActorRef, Props, ActorSystemImpl, ActorSystem}
 import com.typesafe.config.ConfigFactory
 import akka.util.duration._
 import akka.pattern.ask
@@ -55,7 +55,7 @@ private[spark] object AkkaUtils {
    * handle requests. Returns the bound port or throws a SparkException on failure.
    */
   def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route, 
-      name: String = "HttpServer"): Int = {
+      name: String = "HttpServer"): ActorRef = {
     val ioWorker = new IoWorker(actorSystem).start()
     val httpService = actorSystem.actorOf(Props(new HttpService(route)))
     val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
@@ -67,7 +67,7 @@ private[spark] object AkkaUtils {
     try {
       Await.result(future, timeout) match {
         case bound: HttpServer.Bound =>
-          return bound.endpoint.getPort
+          return server
         case other: Any =>
           throw new SparkException("Failed to bind web UI to port " + port + ": " + other)
       }
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index 721c4c6029..51fb440108 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -5,6 +5,9 @@ import java.util.{TimerTask, Timer}
 import spark.Logging
 
 
+/**
+ * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
+ */
 class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
 
   val delaySeconds = MetadataCleaner.getDelaySeconds
-- 
GitLab