From 16612638f0539f197eb7deb1be2ec53fed60d707 Mon Sep 17 00:00:00 2001
From: zhoukang <zhoukang@xiaomi.com>
Date: Tue, 25 Jul 2017 17:59:21 -0700
Subject: [PATCH] [SPARK-21517][CORE] Avoid copying memory when transfer chunks
 remotely

## What changes were proposed in this pull request?

In our production cluster,oom happens when NettyBlockRpcServer receive OpenBlocks message.The reason we observed is below:
When BlockManagerManagedBuffer call ChunkedByteBuffer#toNetty, it will use Unpooled.wrappedBuffer(ByteBuffer... buffers) which use default maxNumComponents=16 in low-level CompositeByteBuf.When our component's number is bigger than 16, it will execute consolidateIfNeeded

        int numComponents = this.components.size();
        if(numComponents > this.maxNumComponents) {
            int capacity = ((CompositeByteBuf.Component)this.components.get(numComponents - 1)).endOffset;
            ByteBuf consolidated = this.allocBuffer(capacity);

            for(int c = 0; c < numComponents; ++c) {
                CompositeByteBuf.Component c1 = (CompositeByteBuf.Component)this.components.get(c);
                ByteBuf b = c1.buf;
                consolidated.writeBytes(b);
                c1.freeIfNecessary();
            }

            CompositeByteBuf.Component var7 = new CompositeByteBuf.Component(consolidated);
            var7.endOffset = var7.length;
            this.components.clear();
            this.components.add(var7);
        }

in CompositeByteBuf which will consume some memory during buffer copy.
We can use another api Unpooled. wrappedBuffer(int maxNumComponents, ByteBuffer... buffers) to avoid this comsuming.

## How was this patch tested?

Test in production cluster.

Author: zhoukang <zhoukang@xiaomi.com>

Closes #18723 from caneGuy/zhoukang/fix-chunkbuffer.
---
 .../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 2f905c8af0..f48bfd5c25 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -66,7 +66,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
    * Wrap this buffer to view it as a Netty ByteBuf.
    */
   def toNetty: ByteBuf = {
-    Unpooled.wrappedBuffer(getChunks(): _*)
+    Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
   }
 
   /**
-- 
GitLab