Skip to content
Snippets Groups Projects
Commit 2d7cf6a2 authored by Josh Rosen's avatar Josh Rosen
Browse files

Restructure BlockInfo fields to reduce memory use.

parent aec9bf90
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package org.apache.spark.storage
import java.io.{InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.{HashMap, ArrayBuffer}
import scala.util.Random
......@@ -46,11 +47,17 @@ private[spark] class BlockManager(
maxMemory: Long)
extends Logging {
// initThread is logically a BlockInfo field, but we store it here because
// it's only needed while this block is in the 'pending' state and we want
// to minimize BlockInfo's memory footprint.
private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@volatile var pending: Boolean = true
@volatile var size: Long = -1L
@volatile var initThread: Thread = null
@volatile var failed = false
@volatile var size: Long = -1L // also encodes 'pending' and 'failed' to save space
private def pending: Boolean = size == -1L
private def failed: Boolean = size == -2L
private def initThread: Thread = blockInfoInitThreads.get(this)
setInitThread()
......@@ -58,7 +65,7 @@ private[spark] class BlockManager(
// Set current thread as init thread - waitForReady will not block this thread
// (in case there is non trivial initialization which ends up calling waitForReady as part of
// initialization itself)
this.initThread = Thread.currentThread()
blockInfoInitThreads.put(this, Thread.currentThread())
}
/**
......@@ -66,7 +73,7 @@ private[spark] class BlockManager(
* Return true if the block is available, false otherwise.
*/
def waitForReady(): Boolean = {
if (initThread != Thread.currentThread() && pending) {
if (pending && initThread != Thread.currentThread()) {
synchronized {
while (pending) this.wait()
}
......@@ -76,12 +83,10 @@ private[spark] class BlockManager(
/** Mark this BlockInfo as ready (i.e. block is finished writing) */
def markReady(sizeInBytes: Long) {
require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
assert (pending)
size = sizeInBytes
initThread = null
failed = false
initThread = null
pending = false
blockInfoInitThreads.remove(this)
synchronized {
this.notifyAll()
}
......@@ -90,11 +95,8 @@ private[spark] class BlockManager(
/** Mark this BlockInfo as ready but failed */
def markFailure() {
assert (pending)
size = 0
initThread = null
failed = true
initThread = null
pending = false
size = -2L
blockInfoInitThreads.remove(this)
synchronized {
this.notifyAll()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment