Skip to content
Snippets Groups Projects
Commit 16612638 authored by zhoukang's avatar zhoukang Committed by Shixiong Zhu
Browse files

[SPARK-21517][CORE] Avoid copying memory when transfer chunks remotely

## What changes were proposed in this pull request?

In our production cluster,oom happens when NettyBlockRpcServer receive OpenBlocks message.The reason we observed is below:
When BlockManagerManagedBuffer call ChunkedByteBuffer#toNetty, it will use Unpooled.wrappedBuffer(ByteBuffer... buffers) which use default maxNumComponents=16 in low-level CompositeByteBuf.When our component's number is bigger than 16, it will execute consolidateIfNeeded

        int numComponents = this.components.size();
        if(numComponents > this.maxNumComponents) {
            int capacity = ((CompositeByteBuf.Component)this.components.get(numComponents - 1)).endOffset;
            ByteBuf consolidated = this.allocBuffer(capacity);

            for(int c = 0; c < numComponents; ++c) {
                CompositeByteBuf.Component c1 = (CompositeByteBuf.Component)this.components.get(c);
                ByteBuf b = c1.buf;
                consolidated.writeBytes(b);
                c1.freeIfNecessary();
            }

            CompositeByteBuf.Component var7 = new CompositeByteBuf.Component(consolidated);
            var7.endOffset = var7.length;
            this.components.clear();
            this.components.add(var7);
        }

in CompositeByteBuf which will consume some memory during buffer copy.
We can use another api Unpooled. wrappedBuffer(int maxNumComponents, ByteBuffer... buffers) to avoid this comsuming.

## How was this patch tested?

Test in production cluster.

Author: zhoukang <zhoukang@xiaomi.com>

Closes #18723 from caneGuy/zhoukang/fix-chunkbuffer.
parent 300807c6
No related branches found
No related tags found
No related merge requests found
......@@ -66,7 +66,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
* Wrap this buffer to view it as a Netty ByteBuf.
*/
def toNetty: ByteBuf = {
Unpooled.wrappedBuffer(getChunks(): _*)
Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment