diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 63a5a2093ebaa746a7aecf4c63b03c74ff6d7faa..6c36f3cca45e0aaa849784c1ed184b32c673a2ae 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -3,8 +3,8 @@
 #  This file configures Spark's internal metrics system. The metrics system is
 #  divided into instances which correspond to internal components.
 #  Each instance can be configured to report its metrics to one or more sinks.
-#  Accepted values for [instance] are "master", "worker", "executor", "driver", 
-#  and "applications". A wild card "*" can be used as an instance name, in 
+#  Accepted values for [instance] are "master", "worker", "executor", "driver",
+#  and "applications". A wild card "*" can be used as an instance name, in
 #  which case all instances will inherit the supplied property.
 #
 #  Within an instance, a "source" specifies a particular set of grouped metrics.
@@ -19,7 +19,7 @@
 #  A "sink" specifies where metrics are delivered to. Each instance can be
 #  assigned one or more sinks.
 #
-#  The sink|source field specifies whether the property relates to a sink or 
+#  The sink|source field specifies whether the property relates to a sink or
 #  source.
 #
 #  The [name] field specifies the name of source or sink.
@@ -28,18 +28,24 @@
 #  source or sink is responsible for parsing this property.
 #
 #  Notes:
-#    1. To add a new sink, set the "class" option to a fully qualified class 
+#    1. To add a new sink, set the "class" option to a fully qualified class
 #    name (see examples below).
 #    2. Some sinks involve a polling period. The minimum allowed polling period
 #    is  1 second.
-#    3. Wild card properties can be overridden by more specific properties. 
-#    For example, master.sink.console.period takes precedence over 
+#    3. Wild card properties can be overridden by more specific properties.
+#    For example, master.sink.console.period takes precedence over
 #    *.sink.console.period.
 #    4. A metrics specific configuration
 #    "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
 #    added to Java properties using -Dspark.metrics.conf=xxx if you want to
 #    customize metrics system. You can also put the file in ${SPARK_HOME}/conf
 #    and it will be loaded automatically.
+#    5. MetricsServlet is added by default as a sink in master, worker and client
+#    driver, you can send http request "/metrics/json" to get a snapshot of all the
+#    registered metrics in json format. For master, requests "/metrics/master/json" and
+#    "/metrics/applications/json" can be sent seperately to get metrics snapshot of
+#    instance master and applications. MetricsServlet may not be configured by self.
+#
 
 # Enable JmxSink for all instances by class name
 #*.sink.jmx.class=spark.metrics.sink.JmxSink
diff --git a/core/pom.xml b/core/pom.xml
index 485aa29f83148f6bb2ff734c69e37188125635bb..dfadd22d4214b4283765a857bf0e89f10bd57290 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -126,6 +126,10 @@
       <groupId>com.codahale.metrics</groupId>
       <artifactId>metrics-jvm</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-json</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.derby</groupId>
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 4a4d9908a06f769f562789078e4d72fdb88b8e04..152cb2887a8f5ff0b9a2133ea65db1681ed07a5c 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -57,14 +57,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
   var firstApp: Option[ApplicationInfo] = None
 
-  val webUi = new MasterWebUI(self, webUiPort)
-
   Utils.checkHost(host, "Expected hostname")
 
   val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
   val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
   val masterSource = new MasterSource(this)
 
+  val webUi = new MasterWebUI(this, webUiPort)
+
   val masterPublicAddress = {
     val envVar = System.getenv("SPARK_PUBLIC_DNS")
     if (envVar != null) envVar else host
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 36a1e91b2479127f0594f8a5f00dc7ae136c6696..405a1ec3a60cafc50fa548c609fb6729fa9bb6f7 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -33,7 +33,7 @@ import spark.deploy.master.ExecutorInfo
 import spark.ui.UIUtils
 
 private[spark] class ApplicationPage(parent: MasterWebUI) {
-  val master = parent.master
+  val master = parent.masterActorRef
   implicit val timeout = parent.timeout
 
   /** Executor details for a particular application */
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index d3b10f197bd3d0ac3f7731b39d8936f4b40991db..4443d880560ec92eb57f65d606e02e3a6b776140 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -35,7 +35,7 @@ import spark.deploy.master.{ApplicationInfo, WorkerInfo}
 import spark.ui.UIUtils
 
 private[spark] class IndexPage(parent: MasterWebUI) {
-  val master = parent.master
+  val master = parent.masterActorRef
   implicit val timeout = parent.timeout
 
   def renderJson(request: HttpServletRequest): JValue = {
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
index 31bdb7854e4977259c32869f304a89d484b08efe..f0a6ffe047a130c7e3252ac4e2ca23022e2bdf58 100644
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,7 +17,6 @@
 
 package spark.deploy.master.ui
 
-import akka.actor.ActorRef
 import akka.util.Duration
 
 import javax.servlet.http.HttpServletRequest
@@ -25,6 +24,7 @@ import javax.servlet.http.HttpServletRequest
 import org.eclipse.jetty.server.{Handler, Server}
 
 import spark.{Logging, Utils}
+import spark.deploy.master.Master
 import spark.ui.JettyUtils
 import spark.ui.JettyUtils._
 
@@ -32,12 +32,14 @@ import spark.ui.JettyUtils._
  * Web UI server for the standalone master.
  */
 private[spark]
-class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
+class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
   implicit val timeout = Duration.create(
     System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
   val host = Utils.localHostName()
   val port = requestedPort
 
+  val masterActorRef = master.self
+
   var server: Option[Server] = None
   var boundPort: Option[Int] = None
 
@@ -57,7 +59,11 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
     }
   }
 
-  val handlers = Array[(String, Handler)](
+  val metricsHandlers = master.masterMetricsSystem.metricsServlet.map(_.getHandlers)
+    .getOrElse(Array()) ++ master.applicationMetricsSystem.metricsServlet.map(_.getHandlers)
+    .getOrElse(Array())
+
+  val handlers = metricsHandlers ++ Array[(String, Handler)](
     ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
     ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
     ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 0e46fa281ec0ef6c8f46c66d41b855862533a0f1..0b5013b86448b43a007738a78b17444c9d7d76c7 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -101,6 +101,7 @@ private[spark] class Worker(
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
+
     webUi.start()
     connectToMaster()
 
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index 742e0a5fb66ee1ac292c44940344e7144679d585..b408c63a02d3cfa76a5158f6438aadbd49c2acbc 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -17,7 +17,6 @@
 
 package spark.deploy.worker.ui
 
-import akka.actor.ActorRef
 import akka.util.{Duration, Timeout}
 
 import java.io.{FileInputStream, File}
@@ -49,7 +48,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
 
   val indexPage = new IndexPage(this)
 
-  val handlers = Array[(String, Handler)](
+  val metricsHandlers = worker.metricsSystem.metricsServlet.map(_.getHandlers).getOrElse(Array())
+
+  val handlers = metricsHandlers ++ Array[(String, Handler)](
     ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
     ("/log", (request: HttpServletRequest) => log(request)),
     ("/logPage", (request: HttpServletRequest) => logPage(request)),
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index 3e32e9c82f87f89816cd66b4e7245a6fd394904d..d7fb5378a495aa1488970a035f3a9653c75321f6 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -36,7 +36,11 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
   var propertyCategories: mutable.HashMap[String, Properties] = null
 
   private def setDefaultProperties(prop: Properties) {
-    // empty function, any default property can be set here
+    prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet")
+    prop.setProperty("*.sink.servlet.uri", "/metrics/json")
+    prop.setProperty("*.sink.servlet.sample", "false")
+    prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
+    prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
   }
 
   def initialize() {
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index 1dacafa13517c66db4ccf20b28177672d9b2dc9c..04c750b17e95ae4963757999b14aa5f43c932c95 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 
 import spark.Logging
-import spark.metrics.sink.Sink
+import spark.metrics.sink.{MetricsServlet, Sink}
 import spark.metrics.source.Source
 
 /**
@@ -35,7 +35,7 @@ import spark.metrics.source.Source
  * "instance" specify "who" (the role) use metrics system. In spark there are several roles
  * like master, worker, executor, client driver, these roles will create metrics system
  * for monitoring. So instance represents these roles. Currently in Spark, several instances
- * have already implemented: master, worker, executor, driver.
+ * have already implemented: master, worker, executor, driver, applications.
  *
  * "source" specify "where" (source) to collect metrics data. In metrics system, there exists
  * two kinds of source:
@@ -51,8 +51,8 @@ import spark.metrics.source.Source
  * Metrics configuration format is like below:
  * [instance].[sink|source].[name].[options] = xxxx
  *
- * [instance] can be "master", "worker", "executor", "driver", which means only the specified
- * instance has this property.
+ * [instance] can be "master", "worker", "executor", "driver", "applications" which means only
+ * the specified instance has this property.
  * wild card "*" can be used to replace instance name, which means all the instances will have
  * this property.
  *
@@ -72,6 +72,9 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
   val sources = new mutable.ArrayBuffer[Source]
   val registry = new MetricRegistry()
 
+  // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
+  var metricsServlet: Option[MetricsServlet] = None
+
   metricsConfig.initialize()
   registerSources()
   registerSinks()
@@ -126,7 +129,11 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
         val sink = Class.forName(classPath)
           .getConstructor(classOf[Properties], classOf[MetricRegistry])
           .newInstance(kv._2, registry)
-        sinks += sink.asInstanceOf[Sink]
+        if (kv._1 == "servlet") {
+           metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+        } else {
+          sinks += sink.asInstanceOf[Sink]
+        }
       } catch {
         case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
       }
diff --git a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
new file mode 100644
index 0000000000000000000000000000000000000000..17432b1ed1a56e6c7d58351ad33e406357374067
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.sink
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.json.MetricsModule
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import spark.ui.JettyUtils
+
+class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
+  val SERVLET_KEY_URI = "uri"
+  val SERVLET_KEY_SAMPLE = "sample"
+
+  val servletURI = property.getProperty(SERVLET_KEY_URI)
+
+  val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
+
+  val mapper = new ObjectMapper().registerModule(
+    new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
+
+  def getHandlers = Array[(String, Handler)](
+    (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
+  )
+
+  def getMetricsSnapshot(request: HttpServletRequest): String = {
+    mapper.writeValueAsString(registry)
+  }
+
+  override def start() { }
+
+  override def stop() { }
+}
diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala
index ca6088ad93a5f8672d58b3e34007ded60c1ff09c..1cc85124d3ac6f3a785b99331527b550f3270150 100644
--- a/core/src/main/scala/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/spark/ui/JettyUtils.scala
@@ -48,7 +48,7 @@ private[spark] object JettyUtils extends Logging {
   implicit def textResponderToHandler(responder: Responder[String]): Handler =
     createHandler(responder, "text/plain")
 
-  private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
+  def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
                                  extractFn: T => String = (in: Any) => in.toString): Handler = {
     new AbstractHandler {
       def handle(target: String,
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 7599f82a94414558f182c775a5a191a67c177ca7..4bcfdeb62bcb68140b595abbfe5bdf9c7fdfdbd8 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
 
 import org.eclipse.jetty.server.{Handler, Server}
 
-import spark.{Logging, SparkContext, Utils}
+import spark.{Logging, SparkContext, SparkEnv, Utils}
 import spark.ui.env.EnvironmentUI
 import spark.ui.exec.ExecutorsUI
 import spark.ui.storage.BlockManagerUI
@@ -43,8 +43,13 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
   val jobs = new JobProgressUI(sc)
   val env = new EnvironmentUI(sc)
   val exec = new ExecutorsUI(sc)
+
+  // Add MetricsServlet handlers by default
+  val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet.map(_.getHandlers)
+    .getOrElse(Array())
+
   val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
-    exec.getHandlers ++ handlers
+    exec.getHandlers ++ metricsServletHandlers ++ handlers
 
   /** Bind the HTTP server which backs this web interface */
   def bind() {
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
index 87cd2ffad25ba11b8a2c377276c5a516d70a3c5d..b0213b62d9c993168a4e3385d801134e239df4e7 100644
--- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -1,12 +1,24 @@
-package spark.metrics
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import java.util.Properties
-import java.io.{File, FileOutputStream}
+package spark.metrics
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
-import spark.metrics._
-
 class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
   var filePath: String = _
 
@@ -18,11 +30,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
     val conf = new MetricsConfig(Option("dummy-file"))
     conf.initialize()
 
-    assert(conf.properties.size() === 0)
+    assert(conf.properties.size() === 5)
     assert(conf.properties.getProperty("test-for-dummy") === null)
 
     val property = conf.getInstance("random")
-    assert(property.size() === 0)
+    assert(property.size() === 3)
+    assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+    assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
+    assert(property.getProperty("sink.servlet.sample") === "false")
   }
 
   test("MetricsConfig with properties set") {
@@ -30,16 +45,22 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
     conf.initialize()
 
     val masterProp = conf.getInstance("master")
-    assert(masterProp.size() === 3)
+    assert(masterProp.size() === 6)
     assert(masterProp.getProperty("sink.console.period") === "20")
     assert(masterProp.getProperty("sink.console.unit") === "minutes")
     assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+    assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+    assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
+    assert(masterProp.getProperty("sink.servlet.sample") === "false")
 
     val workerProp = conf.getInstance("worker")
-    assert(workerProp.size() === 3)
+    assert(workerProp.size() === 6)
     assert(workerProp.getProperty("sink.console.period") === "10")
     assert(workerProp.getProperty("sink.console.unit") === "seconds")
-    assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+    assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+    assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+    assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
+    assert(workerProp.getProperty("sink.servlet.sample") === "false")
   }
 
   test("MetricsConfig with subProperties") {
@@ -47,7 +68,7 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
     conf.initialize()
 
     val propCategories = conf.propertyCategories
-    assert(propCategories.size === 2)
+    assert(propCategories.size === 3)
 
     val masterProp = conf.getInstance("master")
     val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
@@ -55,10 +76,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
     assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
 
     val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
-    assert(sinkProps.size === 1)
+    assert(sinkProps.size === 2)
     assert(sinkProps.contains("console"))
+    assert(sinkProps.contains("servlet"))
 
     val consoleProps = sinkProps("console")
     assert(consoleProps.size() === 2)
+
+    val servletProps = sinkProps("servlet")
+    assert(servletProps.size() === 3)
   }
 }
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
index c1899964172fa235d4525e7958e3110794e20bb6..35c2ae41e9f2949e94b5a552a50a233655bf2c5c 100644
--- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
@@ -1,12 +1,24 @@
-package spark.metrics
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import java.util.Properties
-import java.io.{File, FileOutputStream}
+package spark.metrics
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
-import spark.metrics._
-
 class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
   var filePath: String = _
 
@@ -22,6 +34,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
 
     assert(sources.length === 0)
     assert(sinks.length === 0)
+    assert(metricsSystem.metricsServlet != None)
   }
 
   test("MetricsSystem with sources add") {
@@ -31,6 +44,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
 
     assert(sources.length === 0)
     assert(sinks.length === 1)
+    assert(metricsSystem.metricsServlet != None)
 
     val source = new spark.deploy.master.MasterSource(null)
     metricsSystem.registerSource(source)
diff --git a/pom.xml b/pom.xml
index 1d0cb6a2f9b7ca4593ef715ea19f4d37eba81ab7..b434e0e30309cfff5303600f1ed4dfea98181728 100644
--- a/pom.xml
+++ b/pom.xml
@@ -269,6 +269,11 @@
         <artifactId>metrics-jvm</artifactId>
         <version>3.0.0</version>
       </dependency>
+      <dependency>
+        <groupId>com.codahale.metrics</groupId>
+        <artifactId>metrics-json</artifactId>
+        <version>3.0.0</version>
+      </dependency>
       <dependency>
         <groupId>org.scala-lang</groupId>
         <artifactId>scala-compiler</artifactId>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f8609256506a55b829b9bb8b1531cd2f5a61b9f2..e5c8e8d230d1c06ce5a62dbeaf530a7aac7c2ed4 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -186,6 +186,7 @@ object SparkBuild extends Build {
       "org.apache.derby" % "derby" % "10.4.2.0" % "test",
       "com.codahale.metrics" % "metrics-core" % "3.0.0",
       "com.codahale.metrics" % "metrics-jvm" % "3.0.0",
+      "com.codahale.metrics" % "metrics-json" % "3.0.0",
       "com.twitter" % "chill_2.9.3" % "0.3.1",
       "com.twitter" % "chill-java" % "0.3.1"
     ) ++ (