From 09c7179e812a06cb43a4975bca15d1b9963da975 Mon Sep 17 00:00:00 2001
From: jerryshao <saisai.shao@intel.com>
Date: Mon, 12 Aug 2013 11:40:40 +0800
Subject: [PATCH] MetricsServlet code refactor according to comments

---
 conf/metrics.properties.template                | 13 ++++---------
 .../main/scala/spark/deploy/master/Master.scala | 11 +----------
 .../deploy/master/ui/ApplicationPage.scala      |  2 +-
 .../spark/deploy/master/ui/IndexPage.scala      |  2 +-
 .../spark/deploy/master/ui/MasterWebUI.scala    | 12 +++++++++---
 .../main/scala/spark/deploy/worker/Worker.scala |  5 -----
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |  5 +++--
 .../scala/spark/metrics/MetricsConfig.scala     |  6 ++++--
 .../scala/spark/metrics/MetricsSystem.scala     |  2 +-
 .../spark/metrics/sink/MetricsServlet.scala     | 10 ++--------
 core/src/main/scala/spark/ui/SparkUI.scala      |  6 ++----
 .../spark/metrics/MetricsConfigSuite.scala      | 17 +++++++++++------
 12 files changed, 39 insertions(+), 52 deletions(-)

diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 07fd046539..6c36f3cca4 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -41,17 +41,12 @@
 #    customize metrics system. You can also put the file in ${SPARK_HOME}/conf
 #    and it will be loaded automatically.
 #    5. MetricsServlet is added by default as a sink in master, worker and client
-#    driver, you can send http request "/metrics" to get a snapshot of all the
-#    registered metrics in json format. For master, requests "/metrics/master" and
-#    "/metrics/applications" can be sent seperately to get metrics snapshot of
-#    instance master and applications.
+#    driver, you can send http request "/metrics/json" to get a snapshot of all the
+#    registered metrics in json format. For master, requests "/metrics/master/json" and
+#    "/metrics/applications/json" can be sent seperately to get metrics snapshot of
+#    instance master and applications. MetricsServlet may not be configured by self.
 #
 
-# Change MetricsServlet's property
-#*.sink.servlet.uri=/metrics
-#
-#*.sink.servlet.sample=false
-
 # Enable JmxSink for all instances by class name
 #*.sink.jmx.class=spark.metrics.sink.JmxSink
 
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index f4a74830c6..152cb2887a 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -57,22 +57,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
   var firstApp: Option[ApplicationInfo] = None
 
-  val webUi = new MasterWebUI(self, webUiPort)
-
   Utils.checkHost(host, "Expected hostname")
 
   val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
   val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
   val masterSource = new MasterSource(this)
 
-  // Add default MetricsServlet handler to web ui
-  masterMetricsSystem.metricsServlet foreach { m =>
-    webUi.handlers = m.getHandlers ++ webUi.handlers
-  }
-
-  applicationMetricsSystem.metricsServlet foreach { m =>
-    webUi.handlers = m.getHandlers ++ webUi.handlers
-  }
+  val webUi = new MasterWebUI(this, webUiPort)
 
   val masterPublicAddress = {
     val envVar = System.getenv("SPARK_PUBLIC_DNS")
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 36a1e91b24..405a1ec3a6 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -33,7 +33,7 @@ import spark.deploy.master.ExecutorInfo
 import spark.ui.UIUtils
 
 private[spark] class ApplicationPage(parent: MasterWebUI) {
-  val master = parent.master
+  val master = parent.masterActorRef
   implicit val timeout = parent.timeout
 
   /** Executor details for a particular application */
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index d3b10f197b..4443d88056 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -35,7 +35,7 @@ import spark.deploy.master.{ApplicationInfo, WorkerInfo}
 import spark.ui.UIUtils
 
 private[spark] class IndexPage(parent: MasterWebUI) {
-  val master = parent.master
+  val master = parent.masterActorRef
   implicit val timeout = parent.timeout
 
   def renderJson(request: HttpServletRequest): JValue = {
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
index d9503663f4..f0a6ffe047 100644
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,7 +17,6 @@
 
 package spark.deploy.master.ui
 
-import akka.actor.ActorRef
 import akka.util.Duration
 
 import javax.servlet.http.HttpServletRequest
@@ -25,6 +24,7 @@ import javax.servlet.http.HttpServletRequest
 import org.eclipse.jetty.server.{Handler, Server}
 
 import spark.{Logging, Utils}
+import spark.deploy.master.Master
 import spark.ui.JettyUtils
 import spark.ui.JettyUtils._
 
@@ -32,12 +32,14 @@ import spark.ui.JettyUtils._
  * Web UI server for the standalone master.
  */
 private[spark]
-class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
+class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
   implicit val timeout = Duration.create(
     System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
   val host = Utils.localHostName()
   val port = requestedPort
 
+  val masterActorRef = master.self
+
   var server: Option[Server] = None
   var boundPort: Option[Int] = None
 
@@ -57,7 +59,11 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
     }
   }
 
-  var handlers = Array[(String, Handler)](
+  val metricsHandlers = master.masterMetricsSystem.metricsServlet.map(_.getHandlers)
+    .getOrElse(Array()) ++ master.applicationMetricsSystem.metricsServlet.map(_.getHandlers)
+    .getOrElse(Array())
+
+  val handlers = metricsHandlers ++ Array[(String, Handler)](
     ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
     ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
     ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 92f8cbc610..0b5013b864 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -102,11 +102,6 @@ private[spark] class Worker(
     createWorkDir()
     webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
 
-    // Add default MetricsServlet handlers to webUi
-    metricsSystem.metricsServlet foreach { m =>
-      webUi.handlers = m.getHandlers ++ webUi.handlers
-    }
-
     webUi.start()
     connectToMaster()
 
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index d345cbecac..b408c63a02 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -17,7 +17,6 @@
 
 package spark.deploy.worker.ui
 
-import akka.actor.ActorRef
 import akka.util.{Duration, Timeout}
 
 import java.io.{FileInputStream, File}
@@ -49,7 +48,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
 
   val indexPage = new IndexPage(this)
 
-  var handlers = Array[(String, Handler)](
+  val metricsHandlers = worker.metricsSystem.metricsServlet.map(_.getHandlers).getOrElse(Array())
+
+  val handlers = metricsHandlers ++ Array[(String, Handler)](
     ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
     ("/log", (request: HttpServletRequest) => log(request)),
     ("/logPage", (request: HttpServletRequest) => logPage(request)),
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index d10dc45395..d7fb5378a4 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -37,8 +37,10 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
 
   private def setDefaultProperties(prop: Properties) {
     prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet")
-    prop.setProperty("master.sink.servlet.uri", "/metrics/master")
-    prop.setProperty("applications.sink.servlet.uri", "/metrics/applications")
+    prop.setProperty("*.sink.servlet.uri", "/metrics/json")
+    prop.setProperty("*.sink.servlet.sample", "false")
+    prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
+    prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
   }
 
   def initialize() {
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index ae1f853691..04c750b17e 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -129,7 +129,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
         val sink = Class.forName(classPath)
           .getConstructor(classOf[Properties], classOf[MetricRegistry])
           .newInstance(kv._2, registry)
-        if (kv._1 =="servlet") {
+        if (kv._1 == "servlet") {
            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
         } else {
           sinks += sink.asInstanceOf[Sink]
diff --git a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
index 39ede9b2df..17432b1ed1 100644
--- a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
@@ -34,15 +34,9 @@ class MetricsServlet(val property: Properties, val registry: MetricRegistry) ext
   val SERVLET_KEY_URI = "uri"
   val SERVLET_KEY_SAMPLE = "sample"
 
-  val SERVLET_DEFAULT_URI = "/metrics"
-  val SERVLET_DEFAULT_SAMPLE = false
+  val servletURI = property.getProperty(SERVLET_KEY_URI)
 
-  val servletURI = property.getProperty(SERVLET_KEY_URI, SERVLET_DEFAULT_URI)
-
-  val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)) match {
-    case Some(s) => s.toBoolean
-    case None => SERVLET_DEFAULT_SAMPLE
-  }
+  val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
 
   val mapper = new ObjectMapper().registerModule(
     new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 7e8a41c72e..4bcfdeb62b 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -45,10 +45,8 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
   val exec = new ExecutorsUI(sc)
 
   // Add MetricsServlet handlers by default
-  val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet match {
-    case Some(s) => s.getHandlers
-    case None => Array[(String, Handler)]()
-  }
+  val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet.map(_.getHandlers)
+    .getOrElse(Array())
 
   val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
     exec.getHandlers ++ metricsServletHandlers ++ handlers
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
index df999cd532..b0213b62d9 100644
--- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -30,12 +30,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
     val conf = new MetricsConfig(Option("dummy-file"))
     conf.initialize()
 
-    assert(conf.properties.size() === 3)
+    assert(conf.properties.size() === 5)
     assert(conf.properties.getProperty("test-for-dummy") === null)
 
     val property = conf.getInstance("random")
-    assert(property.size() === 1)
+    assert(property.size() === 3)
     assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+    assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
+    assert(property.getProperty("sink.servlet.sample") === "false")
   }
 
   test("MetricsConfig with properties set") {
@@ -43,19 +45,22 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
     conf.initialize()
 
     val masterProp = conf.getInstance("master")
-    assert(masterProp.size() === 5)
+    assert(masterProp.size() === 6)
     assert(masterProp.getProperty("sink.console.period") === "20")
     assert(masterProp.getProperty("sink.console.unit") === "minutes")
     assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
     assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
-    assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master")
+    assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
+    assert(masterProp.getProperty("sink.servlet.sample") === "false")
 
     val workerProp = conf.getInstance("worker")
-    assert(workerProp.size() === 4)
+    assert(workerProp.size() === 6)
     assert(workerProp.getProperty("sink.console.period") === "10")
     assert(workerProp.getProperty("sink.console.unit") === "seconds")
     assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
     assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+    assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
+    assert(workerProp.getProperty("sink.servlet.sample") === "false")
   }
 
   test("MetricsConfig with subProperties") {
@@ -79,6 +84,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
     assert(consoleProps.size() === 2)
 
     val servletProps = sinkProps("servlet")
-    assert(servletProps.size() === 2)
+    assert(servletProps.size() === 3)
   }
 }
-- 
GitLab