Skip to content
Snippets Groups Projects
Commit c9577148 authored by Kazuaki Ishizaki's avatar Kazuaki Ishizaki Committed by Wenchen Fan
Browse files

[SPARK-22508][SQL] Fix 64KB JVM bytecode limit problem with GenerateUnsafeRowJoiner.create()

## What changes were proposed in this pull request?

This PR changes `GenerateUnsafeRowJoiner.create()` code generation to place generated code for statements to operate bitmap and offset into separated methods if these size could be large.

## How was this patch tested?

Added a new test case into `GenerateUnsafeRowJoinerSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19737 from kiszk/SPARK-22508.
parent 9d45e675
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,9 @@
package org.apache.spark.sql.catalyst.expressions.codegen
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.Platform
......@@ -51,6 +54,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
}
def create(schema1: StructType, schema2: StructType): UnsafeRowJoiner = {
val ctx = new CodegenContext
val offset = Platform.BYTE_ARRAY_OFFSET
val getLong = "Platform.getLong"
val putLong = "Platform.putLong"
......@@ -88,8 +92,14 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
s"$getLong(obj2, offset2 + ${(i - bitset1Words) * 8})"
}
}
s"$putLong(buf, ${offset + i * 8}, $bits);"
}.mkString("\n")
s"$putLong(buf, ${offset + i * 8}, $bits);\n"
}
val copyBitsets = ctx.splitExpressions(
expressions = copyBitset,
funcName = "copyBitsetFunc",
arguments = ("java.lang.Object", "obj1") :: ("long", "offset1") ::
("java.lang.Object", "obj2") :: ("long", "offset2") :: Nil)
// --------------------- copy fixed length portion from row 1 ----------------------- //
var cursor = offset + outputBitsetWords * 8
......@@ -150,11 +160,14 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
s"(${(outputBitsetWords - bitset2Words + schema1.size) * 8}L + numBytesVariableRow1)"
}
val cursor = offset + outputBitsetWords * 8 + i * 8
s"""
|$putLong(buf, $cursor, $getLong(buf, $cursor) + ($shift << 32));
""".stripMargin
s"$putLong(buf, $cursor, $getLong(buf, $cursor) + ($shift << 32));\n"
}
}.mkString("\n")
}
val updateOffsets = ctx.splitExpressions(
expressions = updateOffset,
funcName = "copyBitsetFunc",
arguments = ("long", "numBytesVariableRow1") :: Nil)
// ------------------------ Finally, put everything together --------------------------- //
val codeBody = s"""
......@@ -166,6 +179,8 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
| private byte[] buf = new byte[64];
| private UnsafeRow out = new UnsafeRow(${schema1.size + schema2.size});
|
| ${ctx.declareAddedFunctions()}
|
| public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) {
| // row1: ${schema1.size} fields, $bitset1Words words in bitset
| // row2: ${schema2.size}, $bitset2Words words in bitset
......@@ -180,12 +195,12 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
| final java.lang.Object obj2 = row2.getBaseObject();
| final long offset2 = row2.getBaseOffset();
|
| $copyBitset
| $copyBitsets
| $copyFixedLengthRow1
| $copyFixedLengthRow2
| $copyVariableLengthRow1
| $copyVariableLengthRow2
| $updateOffset
| $updateOffsets
|
| out.pointTo(buf, sizeInBytes);
|
......
......@@ -66,6 +66,11 @@ class GenerateUnsafeRowJoinerSuite extends SparkFunSuite {
}
}
test("SPARK-22508: GenerateUnsafeRowJoiner.create should not generate codes beyond 64KB") {
val N = 3000
testConcatOnce(N, N, variable)
}
private def testConcat(numFields1: Int, numFields2: Int, candidateTypes: Seq[DataType]): Unit = {
for (i <- 0 until 10) {
testConcatOnce(numFields1, numFields2, candidateTypes)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment