diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 77c8c398be955533c260ef3071cd6f5f8933bef3..a2ee45c37e2b3d3b76074bdffaeb7aae1eb0da3a 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -176,6 +176,10 @@ public final class Platform { throw new IllegalStateException("unreachable"); } + public static void setMemory(Object object, long offset, long size, byte value) { + _UNSAFE.setMemory(object, offset, size, value); + } + public static void setMemory(long address, byte value, long size) { _UNSAFE.setMemory(address, size, value); } diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java index 09847cec9c4cac6868b74f4e31558a4ba0f431f3..3cd4264680bfc18a20ef50f35baf23e24147a6c3 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java @@ -24,6 +24,7 @@ import java.util.LinkedList; import java.util.Map; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.memory.MemoryAllocator; /** * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array. @@ -64,12 +65,19 @@ public class HeapMemoryAllocator implements MemoryAllocator { } } long[] array = new long[(int) ((size + 7) / 8)]; - return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); + MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); + if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { + memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); + } + return memory; } @Override public void free(MemoryBlock memory) { final long size = memory.size(); + if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { + memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE); + } if (shouldPool(size)) { synchronized (this) { LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java index 5192f68c862cfc9d60b674a6fecf7ec6e7ae36bd..8bd2b06db8b8b7fdab29fa14803833af26de3b82 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java @@ -19,9 +19,20 @@ package org.apache.spark.unsafe.memory; public interface MemoryAllocator { + /** + * Whether to fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes respectively. + * This helps catch misuse of uninitialized or freed memory, but imposes some overhead. + */ + public static final boolean MEMORY_DEBUG_FILL_ENABLED = Boolean.parseBoolean( + System.getProperty("spark.memory.debugFill", "false")); + + // Same as jemalloc's debug fill values. + public static final byte MEMORY_DEBUG_FILL_CLEAN_VALUE = (byte)0xa5; + public static final byte MEMORY_DEBUG_FILL_FREED_VALUE = (byte)0x5a; + /** * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed - * to be zeroed out (call `zero()` on the result if this is necessary). + * to be zeroed out (call `fill(0)` on the result if this is necessary). */ MemoryBlock allocate(long size) throws OutOfMemoryError; diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java index 1bc924d424c02718a8dd1aa032b21d4de419606b..cd1d378bc14701650e4a5f1d6cfa16de938b1f2c 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java @@ -53,4 +53,11 @@ public class MemoryBlock extends MemoryLocation { public static MemoryBlock fromLongArray(final long[] array) { return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); } + + /** + * Fills the memory block with the specified byte value. + */ + public void fill(byte value) { + Platform.setMemory(obj, offset, length, value); + } } diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java index 98ce711176e43e71f3e32e36264c1a3622485cf8..55bcdf1ed7b06d52152161096e8ed63619bd29e3 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java @@ -27,13 +27,20 @@ public class UnsafeMemoryAllocator implements MemoryAllocator { @Override public MemoryBlock allocate(long size) throws OutOfMemoryError { long address = Platform.allocateMemory(size); - return new MemoryBlock(null, address, size); + MemoryBlock memory = new MemoryBlock(null, address, size); + if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { + memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); + } + return memory; } @Override public void free(MemoryBlock memory) { assert (memory.obj == null) : "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?"; + if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { + memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE); + } Platform.freeMemory(memory.offset); } } diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java index 693ec6ec58dbdf71347569a4ab3fd95eadc34911..a77ba826fce2950c95a9ff533405219360a98685 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java @@ -17,6 +17,9 @@ package org.apache.spark.unsafe; +import org.apache.spark.unsafe.memory.MemoryAllocator; +import org.apache.spark.unsafe.memory.MemoryBlock; + import org.junit.Assert; import org.junit.Test; @@ -58,4 +61,17 @@ public class PlatformUtilSuite { Assert.assertEquals((byte)i, data[i + 1]); } } + + @Test + public void memoryDebugFillEnabledInTest() { + Assert.assertTrue(MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED); + MemoryBlock onheap = MemoryAllocator.HEAP.allocate(1); + MemoryBlock offheap = MemoryAllocator.UNSAFE.allocate(1); + Assert.assertEquals( + Platform.getByte(onheap.getBaseObject(), onheap.getBaseOffset()), + MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); + Assert.assertEquals( + Platform.getByte(offheap.getBaseObject(), offheap.getBaseOffset()), + MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); + } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b1a9f393423bd7b3e7ded12fb9a0fe13add5fc2a..c769ba300e5e68ed4508f25fa5e4d97cd20bb320 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -825,6 +825,7 @@ object TestSettings { javaOptions in Test += "-Dspark.testing=1", javaOptions in Test += "-Dspark.port.maxRetries=100", javaOptions in Test += "-Dspark.master.rest.enabled=false", + javaOptions in Test += "-Dspark.memory.debugFill=true", javaOptions in Test += "-Dspark.ui.enabled=false", javaOptions in Test += "-Dspark.ui.showConsoleProgress=false", javaOptions in Test += "-Dspark.unsafe.exceptionOnMemoryLeak=true",