From d187e7dea9540d26b7800de4eb79863ef5f574bf Mon Sep 17 00:00:00 2001 From: Sital Kedia <skedia@fb.com> Date: Tue, 12 Apr 2016 16:10:07 -0700 Subject: [PATCH] [SPARK-14363] Fix executor OOM due to memory leak in the Sorter ## What changes were proposed in this pull request? Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization. This is a regression partially introduced in PR https://github.com/apache/spark/pull/9241 ## How was this patch tested? Tested by running a job and observed around 30% speedup after this change. Author: Sital Kedia <skedia@fb.com> Closes #12285 from sitalkedia/executor_oom. --- .../apache/spark/shuffle/sort/ShuffleExternalSorter.java | 6 ++++-- .../apache/spark/shuffle/sort/ShuffleInMemorySorter.java | 7 +++++++ .../util/collection/unsafe/sort/UnsafeExternalSorter.java | 7 +++++-- .../util/collection/unsafe/sort/UnsafeInMemorySorter.java | 7 +++++++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 81ee7ab58a..3c2980e442 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -215,8 +215,6 @@ final class ShuffleExternalSorter extends MemoryConsumer { } } - inMemSorter.reset(); - if (!isLastFile) { // i.e. this is a spill file // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter @@ -255,6 +253,10 @@ final class ShuffleExternalSorter extends MemoryConsumer { writeSortedFile(false); final long spillSize = freeMemory(); + inMemSorter.reset(); + // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the + // records. Otherwise, if the task is over allocated memory, then without freeing the memory pages, + // we might not be able to get memory for the pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); return spillSize; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index fe79ff0e30..76b0e6a304 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -51,9 +51,12 @@ final class ShuffleInMemorySorter { */ private int pos = 0; + private int initialSize; + ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) { this.consumer = consumer; assert (initialSize > 0); + this.initialSize = initialSize; this.array = consumer.allocateArray(initialSize); this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE); } @@ -70,6 +73,10 @@ final class ShuffleInMemorySorter { } public void reset() { + if (consumer != null) { + consumer.freeArray(array); + this.array = consumer.allocateArray(initialSize); + } pos = 0; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index ded8f0472b..ef79b49083 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -200,14 +200,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer { spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix()); } spillWriter.close(); - - inMemSorter.reset(); } final long spillSize = freeMemory(); // Note that this is more-or-less going to be a multiple of the page size, so wasted space in // pages will currently be counted as memory spilled even though that space isn't actually // written to disk. This also counts the space needed to store the sorter's pointer array. + inMemSorter.reset(); + // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the + // records. Otherwise, if the task is over allocated memory, then without freeing the memory pages, + // we might not be able to get memory for the pointer array. + taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); return spillSize; diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 145c3a1950..01eae0e8dc 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -84,6 +84,8 @@ public final class UnsafeInMemorySorter { */ private int pos = 0; + private long initialSize; + public UnsafeInMemorySorter( final MemoryConsumer consumer, final TaskMemoryManager memoryManager, @@ -102,6 +104,7 @@ public final class UnsafeInMemorySorter { LongArray array) { this.consumer = consumer; this.memoryManager = memoryManager; + this.initialSize = array.size(); if (recordComparator != null) { this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); @@ -123,6 +126,10 @@ public final class UnsafeInMemorySorter { } public void reset() { + if (consumer != null) { + consumer.freeArray(array); + this.array = consumer.allocateArray(initialSize); + } pos = 0; } -- GitLab