Skip to content
Snippets Groups Projects
Commit f03d9760 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Clean up BlockManagerUI a little (make it not be an object, merge with

Directives, and bind to a random port)
parent 90985072
No related branches found
No related tags found
No related merge requests found
...@@ -44,6 +44,7 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler} ...@@ -44,6 +44,7 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
import spark.scheduler.local.LocalScheduler import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import storage.BlockManagerUI
import util.{MetadataCleaner, TimeStampedHashMap} import util.{MetadataCleaner, TimeStampedHashMap}
/** /**
...@@ -88,8 +89,9 @@ class SparkContext( ...@@ -88,8 +89,9 @@ class SparkContext(
SparkEnv.set(env) SparkEnv.set(env)
// Start the BlockManager UI // Start the BlockManager UI
spark.storage.BlockManagerUI.start(SparkEnv.get.actorSystem, private[spark] val ui = new BlockManagerUI(
SparkEnv.get.blockManager.master.masterActor, this) env.actorSystem, env.blockManager.master.masterActor, this)
ui.start()
// Used to store a URL for each static file/jar together with the file's local timestamp // Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = HashMap[String, Long]() private[spark] val addedFiles = HashMap[String, Long]()
...@@ -97,7 +99,6 @@ class SparkContext( ...@@ -97,7 +99,6 @@ class SparkContext(
// Keeps track of all persisted RDDs // Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]() private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
......
package spark package spark
import java.io._ import java.io._
import java.net.{NetworkInterface, InetAddress, Inet4Address, URL, URI} import java.net._
import java.util.{Locale, Random, UUID} import java.util.{Locale, Random, UUID}
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor} import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
...@@ -11,6 +11,7 @@ import scala.collection.JavaConversions._ ...@@ -11,6 +11,7 @@ import scala.collection.JavaConversions._
import scala.io.Source import scala.io.Source
import com.google.common.io.Files import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.common.util.concurrent.ThreadFactoryBuilder
import scala.Some
/** /**
* Various utility methods used by Spark. * Various utility methods used by Spark.
...@@ -431,4 +432,18 @@ private object Utils extends Logging { ...@@ -431,4 +432,18 @@ private object Utils extends Logging {
} }
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine) "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
} }
/**
* Try to find a free port to bind to on the local host. This should ideally never be needed,
* except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray)
* don't let users bind to port 0 and then figure out which free port they actually bound to.
* We work around this by binding a ServerSocket and immediately unbinding it. This is *not*
* necessarily guaranteed to work, but it's the best we can do.
*/
def findFreePort(): Int = {
val socket = new ServerSocket(0)
val portBound = socket.getLocalPort
socket.close()
portBound
}
} }
...@@ -14,12 +14,15 @@ import cc.spray.typeconversion.SprayJsonSupport._ ...@@ -14,12 +14,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy._ import spark.deploy._
import spark.deploy.JsonProtocol._ import spark.deploy.JsonProtocol._
/**
* Web UI server for the standalone master.
*/
private[spark] private[spark]
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui" val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static" val STATIC_RESOURCE_DIR = "spark/deploy/static"
implicit val timeout = Timeout(1 seconds) implicit val timeout = Timeout(10 seconds)
val handler = { val handler = {
get { get {
...@@ -76,5 +79,4 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct ...@@ -76,5 +79,4 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
getFromResourceDirectory(RESOURCE_DIR) getFromResourceDirectory(RESOURCE_DIR)
} }
} }
} }
...@@ -13,12 +13,15 @@ import cc.spray.typeconversion.SprayJsonSupport._ ...@@ -13,12 +13,15 @@ import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy.{WorkerState, RequestWorkerState} import spark.deploy.{WorkerState, RequestWorkerState}
import spark.deploy.JsonProtocol._ import spark.deploy.JsonProtocol._
/**
* Web UI server for the standalone worker.
*/
private[spark] private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives { class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui" val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static" val STATIC_RESOURCE_DIR = "spark/deploy/static"
implicit val timeout = Timeout(1 seconds) implicit val timeout = Timeout(10 seconds)
val handler = { val handler = {
get { get {
...@@ -50,5 +53,4 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct ...@@ -50,5 +53,4 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
getFromResourceDirectory(RESOURCE_DIR) getFromResourceDirectory(RESOURCE_DIR)
} }
} }
} }
package spark.storage package spark.storage
import akka.actor.{ActorRef, ActorSystem} import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.Await
import akka.pattern.ask import akka.pattern.ask
import akka.util.Timeout import akka.util.Timeout
import akka.util.duration._ import akka.util.duration._
import cc.spray.Directives
import cc.spray.directives._ import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._ import cc.spray.typeconversion.TwirlSupport._
import cc.spray.Directives
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import spark.{Logging, SparkContext, SparkEnv} import spark.{Logging, SparkContext}
import spark.util.AkkaUtils import spark.util.AkkaUtils
import spark.Utils import spark.Utils
/**
* Web UI server for the BlockManager inside each SparkContext.
*/
private[spark] private[spark]
object BlockManagerUI extends Logging { class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
extends Directives with Logging {
val STATIC_RESOURCE_DIR = "spark/deploy/static"
implicit val timeout = Timeout(10 seconds)
/* Starts the Web interface for the BlockManager */ /** Start a HTTP server to run the Web interface */
def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) { def start() {
val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc)
try { try {
// TODO: This needs to find a random free port to bind to. Unfortunately, there's no way val port = if (System.getProperty("spark.ui.port") != null) {
// in spray to do that, so we'll have to rely on something like new ServerSocket() System.getProperty("spark.ui.port").toInt
val boundPort = AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", } else {
Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt, // TODO: Unfortunately, it's not possible to pass port 0 to spray and figure out which
webUIDirectives.handler, "BlockManagerHTTPServer") // random port it bound to, so we have to try to find a local one by creating a socket.
logInfo("Started BlockManager web UI at %s:%d".format(Utils.localHostName(), boundPort)) Utils.findFreePort()
}
AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer")
logInfo("Started BlockManager web UI at http://%s:%d".format(Utils.localHostName(), port))
} catch { } catch {
case e: Exception => case e: Exception =>
logError("Failed to create BlockManager WebUI", e) logError("Failed to create BlockManager WebUI", e)
...@@ -34,58 +43,43 @@ object BlockManagerUI extends Logging { ...@@ -34,58 +43,43 @@ object BlockManagerUI extends Logging {
} }
} }
}
private[spark]
class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef,
sc: SparkContext) extends Directives {
val STATIC_RESOURCE_DIR = "spark/deploy/static"
implicit val timeout = Timeout(1 seconds)
val handler = { val handler = {
get {
get { path("") { completeWith { path("") {
// Request the current storage status from the Master completeWith {
val future = master ? GetStorageStatus // Request the current storage status from the Master
future.map { status => val future = blockManagerMaster ? GetStorageStatus
val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray future.map { status =>
// Calculate macro-level statistics
// Calculate macro-level statistics val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_+_).getOrElse(0L) .reduceOption(_+_).getOrElse(0L)
val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) spark.storage.html.index.
render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
spark.storage.html.index. }
render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) }
} } ~
}}} ~ path("rdd") {
get { path("rdd") { parameter("id") { id => { completeWith { parameter("id") { id =>
val future = master ? GetStorageStatus completeWith {
future.map { status => val future = blockManagerMaster ? GetStorageStatus
val prefix = "rdd_" + id.toString future.map { status =>
val prefix = "rdd_" + id.toString
val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray val filteredStorageStatusList = StorageUtils.
val filteredStorageStatusList = StorageUtils. filterStorageStatusByPrefix(storageStatusList, prefix)
filterStorageStatusByPrefix(storageStatusList, prefix) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head }
}
spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) }
} ~
pathPrefix("static") {
getFromResourceDirectory(STATIC_RESOURCE_DIR)
} }
}}}}} ~
pathPrefix("static") {
getFromResourceDirectory(STATIC_RESOURCE_DIR)
} }
} }
} }
package spark.util package spark.util
import akka.actor.{Props, ActorSystemImpl, ActorSystem} import akka.actor.{ActorRef, Props, ActorSystemImpl, ActorSystem}
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.util.duration._ import akka.util.duration._
import akka.pattern.ask import akka.pattern.ask
...@@ -55,7 +55,7 @@ private[spark] object AkkaUtils { ...@@ -55,7 +55,7 @@ private[spark] object AkkaUtils {
* handle requests. Returns the bound port or throws a SparkException on failure. * handle requests. Returns the bound port or throws a SparkException on failure.
*/ */
def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route, def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route,
name: String = "HttpServer"): Int = { name: String = "HttpServer"): ActorRef = {
val ioWorker = new IoWorker(actorSystem).start() val ioWorker = new IoWorker(actorSystem).start()
val httpService = actorSystem.actorOf(Props(new HttpService(route))) val httpService = actorSystem.actorOf(Props(new HttpService(route)))
val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService))) val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
...@@ -67,7 +67,7 @@ private[spark] object AkkaUtils { ...@@ -67,7 +67,7 @@ private[spark] object AkkaUtils {
try { try {
Await.result(future, timeout) match { Await.result(future, timeout) match {
case bound: HttpServer.Bound => case bound: HttpServer.Bound =>
return bound.endpoint.getPort return server
case other: Any => case other: Any =>
throw new SparkException("Failed to bind web UI to port " + port + ": " + other) throw new SparkException("Failed to bind web UI to port " + port + ": " + other)
} }
......
...@@ -5,6 +5,9 @@ import java.util.{TimerTask, Timer} ...@@ -5,6 +5,9 @@ import java.util.{TimerTask, Timer}
import spark.Logging import spark.Logging
/**
* Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
*/
class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging { class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
val delaySeconds = MetadataCleaner.getDelaySeconds val delaySeconds = MetadataCleaner.getDelaySeconds
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment