From 96941b12f8b465df21423275f3cd3ade579b4fa1 Mon Sep 17 00:00:00 2001
From: "Zhang, Liye" <liye.zhang@intel.com>
Date: Thu, 31 Mar 2016 20:17:52 -0700
Subject: [PATCH] [SPARK-14242][CORE][NETWORK] avoid copy in compositeBuffer
 for frame decoder

## What changes were proposed in this pull request?
In this patch, we set the initial `maxNumComponents` to `Integer.MAX_VALUE` instead of the default size ( which is 16) when allocating `compositeBuffer` in `TransportFrameDecoder` because `compositeBuffer` will introduce too many memory copies underlying if `compositeBuffer` is with default `maxNumComponents` when the frame size is large (which result in many transport messages). For details, please refer to [SPARK-14242](https://issues.apache.org/jira/browse/SPARK-14242).

## How was this patch tested?
spark unit tests and manual tests.
For manual tests, we can reproduce the performance issue with following code:
`sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Double](1024 * 1024 * 50)).iterator).reduce((a,b)=> a).length`
It's easy to see the performance gain, both from the running time and CPU usage.

Author: Zhang, Liye <liye.zhang@intel.com>

Closes #12038 from liyezhang556520/spark-14242.
---
 .../org/apache/spark/network/util/TransportFrameDecoder.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
index bd1830e6ab..fcec7dfd0c 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
@@ -140,7 +140,7 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
     }
 
     // Otherwise, create a composite buffer.
-    CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer();
+    CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
     while (remaining > 0) {
       ByteBuf next = nextBufferForFrame(remaining);
       remaining -= next.readableBytes();
-- 
GitLab