From 44e501ace38ea6e6535a8731e045681175d170ec Mon Sep 17 00:00:00 2001
From: Zhan Zhang <zhanzhang@fb.com>
Date: Sun, 30 Jul 2017 18:50:19 -0700
Subject: [PATCH] [SPARK-19839][CORE] release longArray in BytesToBytesMap

## What changes were proposed in this pull request?
When BytesToBytesMap spills, its longArray should be released. Otherwise, it may not released until the task complete. This array may take a significant amount of memory, which cannot be used by later operator, such as UnsafeShuffleExternalSorter, resulting in more frequent spill in sorter. This patch release the array as destructive iterator will not use this array anymore.

## How was this patch tested?
Manual test in production

Author: Zhan Zhang <zhanzhang@fb.com>

Closes #17180 from zhzhan/memory.
---
 .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java    | 5 +++++
 .../sql/execution/UnsafeFixedWidthAggregationMapSuite.scala  | 5 +++--
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 3b6200e74f..610ace30f8 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -258,6 +258,11 @@ public final class BytesToBytesMap extends MemoryConsumer {
       this.destructive = destructive;
       if (destructive) {
         destructiveIterator = this;
+        // longArray will not be used anymore if destructive is true, release it now.
+        if (longArray != null) {
+          freeArray(longArray);
+          longArray = null;
+        }
       }
     }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index 50d8e30245..d194f58cd1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -127,9 +127,10 @@ class UnsafeFixedWidthAggregationMapSuite
       PAGE_SIZE_BYTES
     )
     val groupKey = InternalRow(UTF8String.fromString("cats"))
+    val row = map.getAggregationBuffer(groupKey)
 
     // Looking up a key stores a zero-entry in the map (like Python Counters or DefaultDicts)
-    assert(map.getAggregationBuffer(groupKey) != null)
+    assert(row != null)
     val iter = map.iterator()
     assert(iter.next())
     iter.getKey.getString(0) should be ("cats")
@@ -138,7 +139,7 @@ class UnsafeFixedWidthAggregationMapSuite
 
     // Modifications to rows retrieved from the map should update the values in the map
     iter.getValue.setInt(0, 42)
-    map.getAggregationBuffer(groupKey).getInt(0) should be (42)
+    row.getInt(0) should be (42)
 
     map.free()
   }
-- 
GitLab