diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index c79f34342f66ceee3df2aa612dcaaf020fe80703..0d3857f9dd4882d0ac6bda8ff7b2f5858d079033 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -94,6 +94,9 @@ abstract class RDD[T: ClassManifest]( /** How this RDD depends on any parent RDDs. */ protected def getDependencies(): List[Dependency[_]] = dependencies_ + /** A friendly name for this RDD */ + var name: String = null + /** Optionally overridden by subclasses to specify placement preferences. */ protected def getPreferredLocations(split: Split): Seq[String] = Nil @@ -108,7 +111,13 @@ abstract class RDD[T: ClassManifest]( /** A unique ID for this RDD (within its SparkContext). */ val id = sc.newRddId() - /** + /** Assign a name to this RDD */ + def setName(_name: String) = { + name = _name + this + } + + /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. Can only be called once on each RDD. */ @@ -119,6 +128,8 @@ abstract class RDD[T: ClassManifest]( "Cannot change storage level of an RDD after it was already assigned a level") } storageLevel = newLevel + // Register the RDD with the SparkContext + sc.persistentRdds(id) = this this } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index bc9fdee8b66ab1a8cb8748e51f876eb4c21db918..4581c0adcf72aa22c603fe8f10a8fb491b5234df 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -1,6 +1,7 @@ package spark import java.io._ +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger import java.net.{URI, URLClassLoader} import java.lang.ref.WeakReference @@ -43,6 +44,7 @@ import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler} import spark.scheduler.local.LocalScheduler import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import util.{MetadataCleaner, TimeStampedHashMap} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -84,10 +86,20 @@ class SparkContext( isLocal) SparkEnv.set(env) + // Start the BlockManager UI + spark.storage.BlockManagerUI.start(SparkEnv.get.actorSystem, + SparkEnv.get.blockManager.master.masterActor, this) + // Used to store a URL for each static file/jar together with the file's local timestamp private[spark] val addedFiles = HashMap[String, Long]() private[spark] val addedJars = HashMap[String, Long]() + // Keeps track of all persisted RDDs + private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]() + + private[spark] val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) + + // Add each JAR given through the constructor jars.foreach { addJar(_) } @@ -493,6 +505,7 @@ class SparkContext( /** Shut down the SparkContext. */ def stop() { if (dagScheduler != null) { + metadataCleaner.cancel() dagScheduler.stop() dagScheduler = null taskScheduler = null @@ -635,6 +648,12 @@ class SparkContext( /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() + + private[spark] def cleanup(cleanupTime: Long) { + var sizeBefore = persistentRdds.size + persistentRdds.clearOldValues(cleanupTime) + logInfo("idToStage " + sizeBefore + " --> " + persistentRdds.size) + } } /** diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index a3d8671834dbc676c83e4a739c40462127a93e93..937115e92cf03f6104ee58015bda45326ec2e1f9 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -1,6 +1,10 @@ package spark.storage -import scala.collection.mutable.ArrayBuffer +import java.io._ +import java.util.{HashMap => JHashMap} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random import akka.actor.{Actor, ActorRef, ActorSystem, Props} diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 2216c33b7665194b59f0c911447ea0c283feb722..b31b6286d3685473ea9b512fadd829b026247995 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -68,6 +68,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { case GetMemoryStatus => getMemoryStatus + case GetStorageStatus => + getStorageStatus + case RemoveBlock(blockId) => removeBlock(blockId) @@ -177,6 +180,14 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { sender ! res } + private def getStorageStatus() { + val res = blockManagerInfo.map { case(blockManagerId, info) => + import collection.JavaConverters._ + StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap) + } + sender ! res + } + private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala index 30483b0b37e5c2c45875b1e77eaa8ef26d4e0dab..3d03ff3a9387408989a014712d234ea12e907a5e 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -98,3 +98,6 @@ case object GetMemoryStatus extends ToBlockManagerMaster private[spark] case object ExpireDeadHosts extends ToBlockManagerMaster + +private[spark] +case object GetStorageStatus extends ToBlockManagerMaster diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala new file mode 100644 index 0000000000000000000000000000000000000000..1003cc7a611603a072f919ea4c189a6ca78a06e0 --- /dev/null +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -0,0 +1,88 @@ +package spark.storage + +import akka.actor.{ActorRef, ActorSystem} +import akka.dispatch.Await +import akka.pattern.ask +import akka.util.Timeout +import akka.util.duration._ +import cc.spray.Directives +import cc.spray.directives._ +import cc.spray.typeconversion.TwirlSupport._ +import scala.collection.mutable.ArrayBuffer +import spark.{Logging, SparkContext, SparkEnv} +import spark.util.AkkaUtils + + +private[spark] +object BlockManagerUI extends Logging { + + /* Starts the Web interface for the BlockManager */ + def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) { + val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc) + try { + logInfo("Starting BlockManager WebUI.") + val port = Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt + AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, + webUIDirectives.handler, "BlockManagerHTTPServer") + } catch { + case e: Exception => + logError("Failed to create BlockManager WebUI", e) + System.exit(1) + } + } + +} + + +private[spark] +class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, + sc: SparkContext) extends Directives { + + val STATIC_RESOURCE_DIR = "spark/deploy/static" + implicit val timeout = Timeout(1 seconds) + + val handler = { + + get { path("") { completeWith { + // Request the current storage status from the Master + val future = master ? GetStorageStatus + future.map { status => + val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray + + // Calculate macro-level statistics + val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) + val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_+_).getOrElse(0L) + + val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) + + spark.storage.html.index. + render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) + } + }}} ~ + get { path("rdd") { parameter("id") { id => { completeWith { + val future = master ? GetStorageStatus + future.map { status => + val prefix = "rdd_" + id.toString + + + val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray + val filteredStorageStatusList = StorageUtils. + filterStorageStatusByPrefix(storageStatusList, prefix) + + val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head + + spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) + + } + }}}}} ~ + pathPrefix("static") { + getFromResourceDirectory(STATIC_RESOURCE_DIR) + } + + } + + + +} diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index d1d1c61c1cb37fb98f13460d23095536f197f24e..3b5a77ab228bb2df833cd04fa6e22e8ba93dd6dd 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -80,6 +80,14 @@ class StorageLevel private( "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication) override def hashCode(): Int = toInt * 41 + replication + def description : String = { + var result = "" + result += (if (useDisk) "Disk " else "") + result += (if (useMemory) "Memory " else "") + result += (if (deserialized) "Deserialized " else "Serialized") + result += "%sx Replicated".format(replication) + result + } } diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala new file mode 100644 index 0000000000000000000000000000000000000000..a10e3a95c60d3a0f4288adf0da42827276f12c18 --- /dev/null +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -0,0 +1,78 @@ +package spark.storage + +import spark.SparkContext +import BlockManagerMasterActor.BlockStatus + +private[spark] +case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, + blocks: Map[String, BlockStatus]) { + + def memUsed(blockPrefix: String = "") = { + blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize). + reduceOption(_+_).getOrElse(0l) + } + + def diskUsed(blockPrefix: String = "") = { + blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.diskSize). + reduceOption(_+_).getOrElse(0l) + } + + def memRemaining : Long = maxMem - memUsed() + +} + +case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, + numPartitions: Int, memSize: Long, diskSize: Long) + + +/* Helper methods for storage-related objects */ +private[spark] +object StorageUtils { + + /* Given the current storage status of the BlockManager, returns information for each RDD */ + def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus], + sc: SparkContext) : Array[RDDInfo] = { + rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc) + } + + /* Given a list of BlockStatus objets, returns information for each RDD */ + def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], + sc: SparkContext) : Array[RDDInfo] = { + // Find all RDD Blocks (ignore broadcast variables) + val rddBlocks = infos.filterKeys(_.startsWith("rdd")) + + // Group by rddId, ignore the partition name + val groupedRddBlocks = infos.groupBy { case(k, v) => + k.substring(0,k.lastIndexOf('_')) + }.mapValues(_.values.toArray) + + // For each RDD, generate an RDDInfo object + groupedRddBlocks.map { case(rddKey, rddBlocks) => + + // Add up memory and disk sizes + val memSize = rddBlocks.map(_.memSize).reduce(_ + _) + val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _) + + // Find the id of the RDD, e.g. rdd_1 => 1 + val rddId = rddKey.split("_").last.toInt + // Get the friendly name for the rdd, if available. + val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey) + val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel + + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize) + }.toArray + } + + /* Removes all BlockStatus object that are not part of a block prefix */ + def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus], + prefix: String) : Array[StorageStatus] = { + + storageStatusList.map { status => + val newBlocks = status.blocks.filterKeys(_.startsWith(prefix)) + //val newRemainingMem = status.maxMem - newBlocks.values.map(_.memSize).reduce(_ + _) + StorageStatus(status.blockManagerId, status.maxMem, newBlocks) + } + + } + +} \ No newline at end of file diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index fbd0ff46bffe36fddf17a8056633c4cba46a6544..ff2c3079be07d2c34bf0b6a1da75c2b5b4181cf9 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -54,12 +54,13 @@ private[spark] object AkkaUtils { * Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to * handle requests. Throws a SparkException if this fails. */ - def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) { + def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route, + name: String = "HttpServer") { val ioWorker = new IoWorker(actorSystem).start() val httpService = actorSystem.actorOf(Props(new HttpService(route))) val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService))) val server = actorSystem.actorOf( - Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer") + Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = name) actorSystem.registerOnTermination { ioWorker.stop() } val timeout = 3.seconds val future = server.ask(HttpServer.Bind(ip, port))(timeout) diff --git a/core/src/main/twirl/spark/deploy/common/layout.scala.html b/core/src/main/twirl/spark/common/layout.scala.html similarity index 100% rename from core/src/main/twirl/spark/deploy/common/layout.scala.html rename to core/src/main/twirl/spark/common/layout.scala.html diff --git a/core/src/main/twirl/spark/deploy/master/index.scala.html b/core/src/main/twirl/spark/deploy/master/index.scala.html index 18c32e5a1f094b35c8dd0107eeadffcd8f09a63a..285645c38989504920f0f049475c4788f80c037c 100644 --- a/core/src/main/twirl/spark/deploy/master/index.scala.html +++ b/core/src/main/twirl/spark/deploy/master/index.scala.html @@ -2,7 +2,7 @@ @import spark.deploy.master._ @import spark.Utils -@spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) { +@spark.common.html.layout(title = "Spark Master on " + state.uri) { <!-- Cluster Details --> <div class="row"> diff --git a/core/src/main/twirl/spark/deploy/master/job_details.scala.html b/core/src/main/twirl/spark/deploy/master/job_details.scala.html index dcf41c28f26f56fa289f7ef7a71c9067b47d1baf..d02a51b214180c26df713d583b23d9fc87db7872 100644 --- a/core/src/main/twirl/spark/deploy/master/job_details.scala.html +++ b/core/src/main/twirl/spark/deploy/master/job_details.scala.html @@ -1,6 +1,6 @@ @(job: spark.deploy.master.JobInfo) -@spark.deploy.common.html.layout(title = "Job Details") { +@spark.common.html.layout(title = "Job Details") { <!-- Job Details --> <div class="row"> diff --git a/core/src/main/twirl/spark/deploy/worker/index.scala.html b/core/src/main/twirl/spark/deploy/worker/index.scala.html index b247307dab06793be7877bee00ffd1255bd4da8c..1d703dae58ccc3b5621f82e12c915648e38423f2 100644 --- a/core/src/main/twirl/spark/deploy/worker/index.scala.html +++ b/core/src/main/twirl/spark/deploy/worker/index.scala.html @@ -1,8 +1,7 @@ @(worker: spark.deploy.WorkerState) - @import spark.Utils -@spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) { +@spark.common.html.layout(title = "Spark Worker on " + worker.uri) { <!-- Worker Details --> <div class="row"> diff --git a/core/src/main/twirl/spark/storage/index.scala.html b/core/src/main/twirl/spark/storage/index.scala.html new file mode 100644 index 0000000000000000000000000000000000000000..2b337f61339b213bea1ac62941363708373e4c58 --- /dev/null +++ b/core/src/main/twirl/spark/storage/index.scala.html @@ -0,0 +1,40 @@ +@(maxMem: Long, remainingMem: Long, diskSpaceUsed: Long, rdds: Array[spark.storage.RDDInfo], storageStatusList: Array[spark.storage.StorageStatus]) +@import spark.Utils + +@spark.common.html.layout(title = "Storage Dashboard") { + + <!-- High-Level Information --> + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>Memory:</strong> + @{Utils.memoryBytesToString(maxMem - remainingMem)} Used + (@{Utils.memoryBytesToString(remainingMem)} Available) </li> + <li><strong>Disk:</strong> @{Utils.memoryBytesToString(diskSpaceUsed)} Used </li> + </ul> + </div> + </div> + + <hr/> + + <!-- RDD Summary --> + <div class="row"> + <div class="span12"> + <h3> RDD Summary </h3> + <br/> + @rdd_table(rdds) + </div> + </div> + + <hr/> + + <!-- Worker Summary --> + <div class="row"> + <div class="span12"> + <h3> Worker Summary </h3> + <br/> + @worker_table(storageStatusList) + </div> + </div> + +} \ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/rdd.scala.html b/core/src/main/twirl/spark/storage/rdd.scala.html new file mode 100644 index 0000000000000000000000000000000000000000..ac7f8c981fa830f62c64acb30d64067eec1ab793 --- /dev/null +++ b/core/src/main/twirl/spark/storage/rdd.scala.html @@ -0,0 +1,77 @@ +@(rddInfo: spark.storage.RDDInfo, storageStatusList: Array[spark.storage.StorageStatus]) +@import spark.Utils + +@spark.common.html.layout(title = "RDD Info ") { + + <!-- High-Level Information --> + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li> + <strong>Storage Level:</strong> + @(rddInfo.storageLevel.description) + <li> + <strong>Partitions:</strong> + @(rddInfo.numPartitions) + </li> + <li> + <strong>Memory Size:</strong> + @{Utils.memoryBytesToString(rddInfo.memSize)} + </li> + <li> + <strong>Disk Size:</strong> + @{Utils.memoryBytesToString(rddInfo.diskSize)} + </li> + </ul> + </div> + </div> + + <hr/> + + <!-- RDD Summary --> + <div class="row"> + <div class="span12"> + <h3> RDD Summary </h3> + <br/> + + + <!-- Block Table Summary --> + <table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <tr> + <th>Block Name</th> + <th>Storage Level</th> + <th>Size in Memory</th> + <th>Size on Disk</th> + </tr> + </thead> + <tbody> + @storageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1).map { case (k,v) => + <tr> + <td>@k</td> + <td> + @(v.storageLevel.description) + </td> + <td>@{Utils.memoryBytesToString(v.memSize)}</td> + <td>@{Utils.memoryBytesToString(v.diskSize)}</td> + </tr> + } + </tbody> + </table> + + + </div> + </div> + + <hr/> + + <!-- Worker Table --> + <div class="row"> + <div class="span12"> + <h3> Worker Summary </h3> + <br/> + @worker_table(storageStatusList, "rdd_" + rddInfo.id ) + </div> + </div> + +} \ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/rdd_table.scala.html b/core/src/main/twirl/spark/storage/rdd_table.scala.html new file mode 100644 index 0000000000000000000000000000000000000000..af801cf229569fb3110f599f64810063dc695156 --- /dev/null +++ b/core/src/main/twirl/spark/storage/rdd_table.scala.html @@ -0,0 +1,30 @@ +@(rdds: Array[spark.storage.RDDInfo]) +@import spark.Utils + +<table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <tr> + <th>RDD Name</th> + <th>Storage Level</th> + <th>Partitions</th> + <th>Size in Memory</th> + <th>Size on Disk</th> + </tr> + </thead> + <tbody> + @for(rdd <- rdds) { + <tr> + <td> + <a href="rdd?id=@(rdd.id)"> + @rdd.name + </a> + </td> + <td>@(rdd.storageLevel.description) + </td> + <td>@rdd.numPartitions</td> + <td>@{Utils.memoryBytesToString(rdd.memSize)}</td> + <td>@{Utils.memoryBytesToString(rdd.diskSize)}</td> + </tr> + } + </tbody> +</table> \ No newline at end of file diff --git a/core/src/main/twirl/spark/storage/worker_table.scala.html b/core/src/main/twirl/spark/storage/worker_table.scala.html new file mode 100644 index 0000000000000000000000000000000000000000..d54b8de4cc81394149cceac0d52b965483e95a7c --- /dev/null +++ b/core/src/main/twirl/spark/storage/worker_table.scala.html @@ -0,0 +1,24 @@ +@(workersStatusList: Array[spark.storage.StorageStatus], prefix: String = "") +@import spark.Utils + +<table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <tr> + <th>Host</th> + <th>Memory Usage</th> + <th>Disk Usage</th> + </tr> + </thead> + <tbody> + @for(status <- workersStatusList) { + <tr> + <td>@(status.blockManagerId.ip + ":" + status.blockManagerId.port)</td> + <td> + @(Utils.memoryBytesToString(status.memUsed(prefix))) + (@(Utils.memoryBytesToString(status.memRemaining)) Total Available) + </td> + <td>@(Utils.memoryBytesToString(status.diskUsed(prefix)))</td> + </tr> + } + </tbody> +</table> \ No newline at end of file