From c9577148069d2215dc79cbf828a378591b4fba5d Mon Sep 17 00:00:00 2001
From: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Date: Tue, 21 Nov 2017 12:16:54 +0100
Subject: [PATCH] [SPARK-22508][SQL] Fix 64KB JVM bytecode limit problem with
 GenerateUnsafeRowJoiner.create()

## What changes were proposed in this pull request?

This PR changes `GenerateUnsafeRowJoiner.create()` code generation to place generated code for statements to operate bitmap and offset into separated methods if these size could be large.

## How was this patch tested?

Added a new test case into `GenerateUnsafeRowJoinerSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19737 from kiszk/SPARK-22508.
---
 .../codegen/GenerateUnsafeRowJoiner.scala     | 31 ++++++++++++++-----
 .../GenerateUnsafeRowJoinerSuite.scala        |  5 +++
 2 files changed, 28 insertions(+), 8 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
index 6bc72a0d75..be5f5a73b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.catalyst.expressions.codegen
 
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.Platform
@@ -51,6 +54,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
   }
 
   def create(schema1: StructType, schema2: StructType): UnsafeRowJoiner = {
+    val ctx = new CodegenContext
     val offset = Platform.BYTE_ARRAY_OFFSET
     val getLong = "Platform.getLong"
     val putLong = "Platform.putLong"
@@ -88,8 +92,14 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
           s"$getLong(obj2, offset2 + ${(i - bitset1Words) * 8})"
         }
       }
-      s"$putLong(buf, ${offset + i * 8}, $bits);"
-    }.mkString("\n")
+      s"$putLong(buf, ${offset + i * 8}, $bits);\n"
+    }
+
+    val copyBitsets = ctx.splitExpressions(
+      expressions = copyBitset,
+      funcName = "copyBitsetFunc",
+      arguments = ("java.lang.Object", "obj1") :: ("long", "offset1") ::
+                  ("java.lang.Object", "obj2") :: ("long", "offset2") :: Nil)
 
     // --------------------- copy fixed length portion from row 1 ----------------------- //
     var cursor = offset + outputBitsetWords * 8
@@ -150,11 +160,14 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
             s"(${(outputBitsetWords - bitset2Words + schema1.size) * 8}L + numBytesVariableRow1)"
           }
         val cursor = offset + outputBitsetWords * 8 + i * 8
-        s"""
-           |$putLong(buf, $cursor, $getLong(buf, $cursor) + ($shift << 32));
-         """.stripMargin
+        s"$putLong(buf, $cursor, $getLong(buf, $cursor) + ($shift << 32));\n"
       }
-    }.mkString("\n")
+    }
+
+    val updateOffsets = ctx.splitExpressions(
+      expressions = updateOffset,
+      funcName = "copyBitsetFunc",
+      arguments = ("long", "numBytesVariableRow1") :: Nil)
 
     // ------------------------ Finally, put everything together  --------------------------- //
     val codeBody = s"""
@@ -166,6 +179,8 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
        |  private byte[] buf = new byte[64];
        |  private UnsafeRow out = new UnsafeRow(${schema1.size + schema2.size});
        |
+       |  ${ctx.declareAddedFunctions()}
+       |
        |  public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) {
        |    // row1: ${schema1.size} fields, $bitset1Words words in bitset
        |    // row2: ${schema2.size}, $bitset2Words words in bitset
@@ -180,12 +195,12 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
        |    final java.lang.Object obj2 = row2.getBaseObject();
        |    final long offset2 = row2.getBaseOffset();
        |
-       |    $copyBitset
+       |    $copyBitsets
        |    $copyFixedLengthRow1
        |    $copyFixedLengthRow2
        |    $copyVariableLengthRow1
        |    $copyVariableLengthRow2
-       |    $updateOffset
+       |    $updateOffsets
        |
        |    out.pointTo(buf, sizeInBytes);
        |
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
index 9f19745cef..f203f25ad1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
@@ -66,6 +66,11 @@ class GenerateUnsafeRowJoinerSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-22508: GenerateUnsafeRowJoiner.create should not generate codes beyond 64KB") {
+    val N = 3000
+    testConcatOnce(N, N, variable)
+  }
+
   private def testConcat(numFields1: Int, numFields2: Int, candidateTypes: Seq[DataType]): Unit = {
     for (i <- 0 until 10) {
       testConcatOnce(numFields1, numFields2, candidateTypes)
-- 
GitLab