Skip to content
Snippets Groups Projects
Commit c2dd6bcd authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Merge pull request #279 from aarondav/shuffle-cleanup0

Clean up shuffle files once their metadata is gone

Previously, we would only clean the in-memory metadata for consolidated shuffle files.

Additionally, fixes a bug where the Metadata Cleaner was ignoring type-specific TTLs.
parents 3bf7c708 0647ec97
No related branches found
No related tags found
No related merge requests found
......@@ -70,10 +70,16 @@ class ShuffleBlockManager(blockManager: BlockManager) {
* Contains all the state related to a particular shuffle. This includes a pool of unused
* ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
*/
private class ShuffleState() {
private class ShuffleState(val numBuckets: Int) {
val nextFileId = new AtomicInteger(0)
val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
/**
* The mapIds of all map tasks completed on this Executor for this shuffle.
* NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
*/
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}
type ShuffleId = Int
......@@ -84,7 +90,7 @@ class ShuffleBlockManager(blockManager: BlockManager) {
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState())
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null
......@@ -109,6 +115,8 @@ class ShuffleBlockManager(blockManager: BlockManager) {
fileGroup.recordMapOutput(mapId, offsets)
}
recycleFileGroup(fileGroup)
} else {
shuffleState.completedMapTasks.add(mapId)
}
}
......@@ -154,7 +162,18 @@ class ShuffleBlockManager(blockManager: BlockManager) {
}
private def cleanup(cleanupTime: Long) {
shuffleStates.clearOldValues(cleanupTime)
shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => {
if (consolidateShuffleFiles) {
for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
file.delete()
}
} else {
for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) {
val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
blockManager.diskBlockManager.getFile(blockId).delete()
}
}
})
}
}
......
......@@ -27,7 +27,7 @@ import org.apache.spark.Logging
class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging {
val name = cleanerType.toString
private val delaySeconds = MetadataCleaner.getDelaySeconds
private val delaySeconds = MetadataCleaner.getDelaySeconds(cleanerType)
private val periodSeconds = math.max(10, delaySeconds / 10)
private val timer = new Timer(name + " cleanup timer", true)
......
......@@ -104,19 +104,28 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging {
def toMap: immutable.Map[A, B] = iterator.toMap
/**
* Removes old key-value pairs that have timestamp earlier than `threshTime`
* Removes old key-value pairs that have timestamp earlier than `threshTime`,
* calling the supplied function on each such entry before removing.
*/
def clearOldValues(threshTime: Long) {
def clearOldValues(threshTime: Long, f: (A, B) => Unit) {
val iterator = internalMap.entrySet().iterator()
while(iterator.hasNext) {
while (iterator.hasNext) {
val entry = iterator.next()
if (entry.getValue._2 < threshTime) {
f(entry.getKey, entry.getValue._1)
logDebug("Removing key " + entry.getKey)
iterator.remove()
}
}
}
/**
* Removes old key-value pairs that have timestamp earlier than `threshTime`
*/
def clearOldValues(threshTime: Long) {
clearOldValues(threshTime, (_, _) => ())
}
private def currentTime: Long = System.currentTimeMillis()
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment