Skip to content
Snippets Groups Projects
Commit 846b1cf5 authored by Josh Rosen's avatar Josh Rosen
Browse files

Store fewer BlockInfo fields for shuffle blocks.

parent 2d7cf6a2
No related branches found
No related tags found
No related merge requests found
......@@ -53,10 +53,15 @@ private[spark] class BlockManager(
// to minimize BlockInfo's memory footprint.
private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@volatile var size: Long = -1L // also encodes 'pending' and 'failed' to save space
private def pending: Boolean = size == -1L
private def failed: Boolean = size == -2L
private val BLOCK_PENDING: Long = -1L
private val BLOCK_FAILED: Long = -2L
private trait BlockInfo {
def level: StorageLevel
def tellMaster: Boolean
@volatile var size: Long = BLOCK_PENDING // also encodes 'pending' and 'failed' to save space
private def pending: Boolean = size == BLOCK_PENDING
private def failed: Boolean = size == BLOCK_FAILED
private def initThread: Thread = blockInfoInitThreads.get(this)
setInitThread()
......@@ -95,7 +100,7 @@ private[spark] class BlockManager(
/** Mark this BlockInfo as ready but failed */
def markFailure() {
assert (pending)
size = -2L
size = BLOCK_FAILED
blockInfoInitThreads.remove(this)
synchronized {
this.notifyAll()
......@@ -103,6 +108,19 @@ private[spark] class BlockManager(
}
}
// All shuffle blocks have the same `level` and `tellMaster` properties,
// so we can save space by not storing them in each instance:
private class ShuffleBlockInfo extends BlockInfo {
// These need to be defined using 'def' instead of 'val' in order for
// the compiler to eliminate the fields:
def level: StorageLevel = StorageLevel.DISK_ONLY
def tellMaster: Boolean = false
}
private class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) extends BlockInfo {
// Intentionally left blank
}
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
......@@ -528,7 +546,7 @@ private[spark] class BlockManager(
if (shuffleBlockManager.consolidateShuffleFiles) {
diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
}
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
val myInfo = new ShuffleBlockInfo()
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
})
......@@ -562,7 +580,7 @@ private[spark] class BlockManager(
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = {
val tinfo = new BlockInfo(level, tellMaster)
val tinfo = new BlockInfoImpl(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment