Skip to content
Snippets Groups Projects
Commit 2f523fa0 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Mridul Muralidharan
Browse files

[SPARK-19244][CORE] Sort MemoryConsumers according to their memory usage when spilling

## What changes were proposed in this pull request?

In `TaskMemoryManager `, when we acquire memory by calling `acquireExecutionMemory` and we can't acquire required memory, we will try to spill other memory consumers.

Currently, we simply iterates the memory consumers in a hash set. Normally each time the consumer will be iterated in the same order.

The first issue is that we might spill additional consumers. For example, if consumer 1 uses 10MB, consumer 2 uses 50MB, then consumer 3 acquires 100MB but we can only get 60MB and spilling is needed. We might spill both consumer 1 and consumer 2. But we actually just need to spill consumer 2 and get the required 100MB.

The second issue is that if we spill consumer 1 in first time spilling. After a while, consumer 1 now uses 5MB. Then consumer 4 may acquire some memory and spilling is needed again. Because we iterate the memory consumers in the same order, we will spill consumer 1 again. So for consumer 1, we will produce many small spilling files.

This patch modifies the way iterating the memory consumers. It sorts the memory consumers by their memory usage. So the consumer using more memory will spill first. Once it is spilled, even it acquires few memory again, in next time spilling happens it will not be the consumers to spill again if there are other consumers using more memory than it.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16603 from viirya/sort-memoryconsumer-when-spill.
parent 52d4f619
No related branches found
No related tags found
No related merge requests found
......@@ -20,8 +20,12 @@ package org.apache.spark.memory;
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
......@@ -144,23 +148,49 @@ public class TaskMemoryManager {
// spilling, avoid to have too many spilled files.
if (got < required) {
// Call spill() on other consumers to release memory
// Sort the consumers according their memory usage. So we avoid spilling the same consumer
// which is just spilled in last few times and re-spilling on it will produce many small
// spill files.
TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
for (MemoryConsumer c: consumers) {
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
try {
long released = c.spill(required - got, consumer);
if (released > 0) {
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) {
break;
}
long key = c.getUsed();
List<MemoryConsumer> list = sortedConsumers.get(key);
if (list == null) {
list = new ArrayList<>(1);
sortedConsumers.put(key, list);
}
list.add(c);
}
}
while (!sortedConsumers.isEmpty()) {
// Get the consumer using the least memory more than the remaining required memory.
Map.Entry<Long, List<MemoryConsumer>> currentEntry =
sortedConsumers.ceilingEntry(required - got);
// No consumer has used memory more than the remaining required memory.
// Get the consumer of largest used memory.
if (currentEntry == null) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
MemoryConsumer c = cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
try {
long released = c.spill(required - got, consumer);
if (released > 0) {
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) {
break;
}
} catch (IOException e) {
logger.error("error while calling spill() on " + c, e);
throw new OutOfMemoryError("error while calling spill() on " + c + " : "
+ e.getMessage());
}
} catch (IOException e) {
logger.error("error while calling spill() on " + c, e);
throw new OutOfMemoryError("error while calling spill() on " + c + " : "
+ e.getMessage());
}
}
}
......
......@@ -109,6 +109,41 @@ public class TaskMemoryManagerSuite {
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
}
@Test
public void cooperativeSpilling2() {
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
memoryManager.limit(100);
final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
TestMemoryConsumer c3 = new TestMemoryConsumer(manager);
c1.use(20);
Assert.assertEquals(20, c1.getUsed());
c2.use(80);
Assert.assertEquals(80, c2.getUsed());
c3.use(80);
Assert.assertEquals(20, c1.getUsed()); // c1: not spilled
Assert.assertEquals(0, c2.getUsed()); // c2: spilled as it has required size of memory
Assert.assertEquals(80, c3.getUsed());
c2.use(80);
Assert.assertEquals(20, c1.getUsed()); // c1: not spilled
Assert.assertEquals(0, c3.getUsed()); // c3: spilled as it has required size of memory
Assert.assertEquals(80, c2.getUsed());
c3.use(10);
Assert.assertEquals(0, c1.getUsed()); // c1: spilled as it has required size of memory
Assert.assertEquals(80, c2.getUsed()); // c2: not spilled as spilling c1 already satisfies c3
Assert.assertEquals(10, c3.getUsed());
c1.free(0);
c2.free(80);
c3.free(10);
Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
}
@Test
public void shouldNotForceSpillingInDifferentModes() {
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment