Skip to content
Snippets Groups Projects
Commit d290e964 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #281 from rxin/memreport

Added a method to report slave memory status; force serialize accumulator update in local mode.
parents 0bd20c63 3b971246
No related branches found
No related tags found
No related merge requests found
......@@ -45,7 +45,6 @@ import spark.scheduler.TaskScheduler
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.BlockManagerMaster
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
......@@ -199,7 +198,7 @@ class SparkContext(
parallelize(seq, numSlices)
}
/**
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
......@@ -400,7 +399,7 @@ class SparkContext(
new Accumulable(initialValue, param)
}
/**
/**
* Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
* reading it in distributed functions. The variable will be sent to each cluster only once.
*/
......@@ -426,6 +425,16 @@ class SparkContext(
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
}
/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
def getSlavesMemoryStatus: Map[String, (Long, Long)] = {
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
(blockManagerId.ip + ":" + blockManagerId.port, mem)
}
}
/**
* Clear the job's list of files added by `addFile` so that they do not get donwloaded to
* any new nodes.
......
......@@ -30,12 +30,12 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
val classLoader = new ExecutorURLClassLoader(Array(), Thread.currentThread.getContextClassLoader)
// TODO: Need to take into account stage priority in scheduling
override def start() { }
override def setListener(listener: TaskSchedulerListener) {
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
}
......@@ -78,7 +78,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
// on in development (so when users move their local Spark programs
// to the cluster, they don't get surprised by serialization errors).
val resultToReturn = ser.deserialize[Any](ser.serialize(result))
val accumUpdates = Accumulators.values
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values))
logInfo("Finished task " + idInJob)
listener.taskEnded(task, Success, resultToReturn, accumUpdates)
} catch {
......@@ -126,7 +127,7 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
}
}
}
override def stop() {
threadPool.shutdownNow()
}
......
......@@ -71,76 +71,79 @@ object HeartBeat {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
private[spark]
case class GetLocations(blockId: String) extends ToBlockManagerMaster
private[spark]
case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
private[spark]
case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
private[spark]
case class RemoveHost(host: String) extends ToBlockManagerMaster
private[spark]
case object StopBlockManagerMaster extends ToBlockManagerMaster
private[spark]
case object GetMemoryStatus extends ToBlockManagerMaster
private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxMem: Long) {
private var lastSeenMs = timeMs
private var remainingMem = maxMem
private val blocks = new JHashMap[String, StorageLevel]
private var _lastSeenMs = timeMs
private var _remainingMem = maxMem
private val _blocks = new JHashMap[String, StorageLevel]
logInfo("Registering block manager %s:%d with %s RAM".format(
blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
def updateLastSeenMs() {
lastSeenMs = System.currentTimeMillis() / 1000
_lastSeenMs = System.currentTimeMillis() / 1000
}
def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
: Unit = synchronized {
updateLastSeenMs()
if (blocks.containsKey(blockId)) {
if (_blocks.containsKey(blockId)) {
// The block exists on the slave already.
val originalLevel: StorageLevel = blocks.get(blockId)
val originalLevel: StorageLevel = _blocks.get(blockId)
if (originalLevel.useMemory) {
remainingMem += memSize
_remainingMem += memSize
}
}
if (storageLevel.isValid) {
// isValid means it is either stored in-memory or on-disk.
blocks.put(blockId, storageLevel)
_blocks.put(blockId, storageLevel)
if (storageLevel.useMemory) {
remainingMem -= memSize
_remainingMem -= memSize
logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(remainingMem)))
Utils.memoryBytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
logInfo("Added %s on disk on %s:%d (size: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
} else if (blocks.containsKey(blockId)) {
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
val originalLevel: StorageLevel = blocks.get(blockId)
blocks.remove(blockId)
val originalLevel: StorageLevel = _blocks.get(blockId)
_blocks.remove(blockId)
if (originalLevel.useMemory) {
remainingMem += memSize
_remainingMem += memSize
logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(remainingMem)))
Utils.memoryBytesToString(_remainingMem)))
}
if (originalLevel.useDisk) {
logInfo("Removed %s on %s:%d on disk (size: %s)".format(
......@@ -149,20 +152,14 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
}
def getLastSeenMs: Long = {
return lastSeenMs
}
def getRemainedMem: Long = {
return remainingMem
}
def remainingMem: Long = _remainingMem
override def toString: String = {
return "BlockManagerInfo " + timeMs + " " + remainingMem
}
def lastSeenMs: Long = _lastSeenMs
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear() {
blocks.clear()
_blocks.clear()
}
}
......@@ -170,7 +167,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging()
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
......@@ -197,7 +194,10 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case GetPeers(blockManagerId, size) =>
getPeersDeterministic(blockManagerId, size)
/*getPeers(blockManagerId, size)*/
case GetMemoryStatus =>
getMemoryStatus
case RemoveHost(host) =>
removeHost(host)
sender ! true
......@@ -207,10 +207,18 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! true
context.stop(self)
case other =>
case other =>
logInfo("Got unknown message: " + other)
}
// Return a map from the block manager id to max memory and remaining memory.
private def getMemoryStatus() {
val res = blockManagerInfo.map { case(blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
sender ! res
}
private def register(blockManagerId: BlockManagerId, maxMemSize: Long) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
......@@ -224,25 +232,25 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
private def heartBeat(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " " + blockId + " "
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: HashSet[BlockManagerId] = null
if (blockInfo.containsKey(blockId)) {
locations = blockInfo.get(blockId)._2
......@@ -250,19 +258,19 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
locations = new HashSet[BlockManagerId]
blockInfo.put(blockId, (storageLevel.replication, locations))
}
if (storageLevel.isValid) {
locations += blockManagerId
} else {
locations.remove(blockManagerId)
}
if (locations.size == 0) {
blockInfo.remove(blockId)
}
sender ! true
}
private def getLocations(blockId: String) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockId + " "
......@@ -270,7 +278,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockInfo.get(blockId)._2)
logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
+ Utils.getUsedTimeMs(startTimeMs))
sender ! res.toSeq
} else {
......@@ -279,7 +287,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! res
}
}
private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
def getLocations(blockId: String): Seq[BlockManagerId] = {
val tmp = blockId
......@@ -295,7 +303,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
return res.toSeq
}
}
logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
for (blockId <- blockIds) {
......@@ -316,7 +324,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
sender ! res.toSeq
}
private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
......@@ -362,7 +370,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
logInfo("Connecting to BlockManagerMaster: " + url)
masterActor = actorSystem.actorFor(url)
}
def stop() {
if (masterActor != null) {
communicate(StopBlockManagerMaster)
......@@ -389,7 +397,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
throw new SparkException("Error reply received from BlockManagerMaster")
}
}
def notifyADeadHost(host: String) {
communicate(RemoveHost(host + ":" + DEFAULT_MANAGER_PORT))
logInfo("Removed " + host + " successfully in notifyADeadHost")
......@@ -409,7 +417,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
try {
communicate(msg)
logInfo("BlockManager registered successfully @ syncRegisterBlockManager")
......@@ -421,19 +429,19 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return false
}
}
def mustHeartBeat(msg: HeartBeat) {
while (! syncHeartBeat(msg)) {
logWarning("Failed to send heartbeat" + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
}
def syncHeartBeat(msg: HeartBeat): Boolean = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
try {
communicate(msg)
logDebug("Heartbeat sent successfully")
......@@ -445,7 +453,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return false
}
}
def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
var res = syncGetLocations(msg)
while (res == null) {
......@@ -455,7 +463,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
return res
}
def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
......@@ -488,13 +496,13 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
return res
}
def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
Seq[Seq[BlockManagerId]] = {
val startTimeMs = System.currentTimeMillis
val tmp = " msg " + msg + " "
logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
try {
val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]]
if (answer != null) {
......@@ -512,7 +520,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return null
}
}
def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
var res = syncGetPeers(msg)
while ((res == null) || (res.length != msg.size)) {
......@@ -520,10 +528,10 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
res = syncGetPeers(msg)
}
return res
}
def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
val startTimeMs = System.currentTimeMillis
val tmp = " msg " + msg + " "
......@@ -545,4 +553,8 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return null
}
}
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]]
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment