From 22dd65f58e12cb3a883d106fcccdff25a2a00fe8 Mon Sep 17 00:00:00 2001
From: Wenchen Fan <wenchen@databricks.com>
Date: Tue, 13 Jun 2017 00:12:34 +0800
Subject: [PATCH] [SPARK-21046][SQL] simplify the array offset and length in
 ColumnVector

## What changes were proposed in this pull request?

Currently when a `ColumnVector` stores array type elements, we will use 2 arrays for lengths and offsets and implement them individually in on-heap and off-heap column vector.

In this PR, we use one array to represent both offsets and lengths, so that we can treat it as `ColumnVector` and all the logic can go to the base class `ColumnVector`

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18260 from cloud-fan/put.
---
 .../execution/vectorized/ColumnVector.java    | 35 ++++++-------
 .../vectorized/OffHeapColumnVector.java       | 47 ++----------------
 .../vectorized/OnHeapColumnVector.java        | 49 +++----------------
 .../vectorized/ColumnarBatchSuite.scala       | 17 ++++---
 4 files changed, 38 insertions(+), 110 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 24260a6019..e50799eeb2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql.execution.vectorized;
 
 import java.math.BigDecimal;
@@ -518,19 +519,13 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract double getDouble(int rowId);
 
   /**
-   * Puts a byte array that already exists in this column.
-   */
-  public abstract void putArray(int rowId, int offset, int length);
-
-  /**
-   * Returns the length of the array at rowid.
+   * After writing array elements to the child column vector, call this method to set the offset and
+   * size of the written array.
    */
-  public abstract int getArrayLength(int rowId);
-
-  /**
-   * Returns the offset of the array at rowid.
-   */
-  public abstract int getArrayOffset(int rowId);
+  public void putArrayOffsetAndSize(int rowId, int offset, int size) {
+    long offsetAndSize = (((long) offset) << 32) | size;
+    putLong(rowId, offsetAndSize);
+  }
 
   /**
    * Returns a utility object to get structs.
@@ -553,8 +548,9 @@ public abstract class ColumnVector implements AutoCloseable {
    * Returns the array at rowid.
    */
   public final Array getArray(int rowId) {
-    resultArray.length = getArrayLength(rowId);
-    resultArray.offset = getArrayOffset(rowId);
+    long offsetAndSize = getLong(rowId);
+    resultArray.offset = (int) (offsetAndSize >> 32);
+    resultArray.length = (int) offsetAndSize;
     return resultArray;
   }
 
@@ -566,7 +562,12 @@ public abstract class ColumnVector implements AutoCloseable {
   /**
    * Sets the value at rowId to `value`.
    */
-  public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
+  public int putByteArray(int rowId, byte[] value, int offset, int length) {
+    int result = arrayData().appendBytes(length, value, offset);
+    putArrayOffsetAndSize(rowId, result, length);
+    return result;
+  }
+
   public final int putByteArray(int rowId, byte[] value) {
     return putByteArray(rowId, value, 0, value.length);
   }
@@ -829,13 +830,13 @@ public abstract class ColumnVector implements AutoCloseable {
   public final int appendByteArray(byte[] value, int offset, int length) {
     int copiedOffset = arrayData().appendBytes(length, value, offset);
     reserve(elementsAppended + 1);
-    putArray(elementsAppended, copiedOffset, length);
+    putArrayOffsetAndSize(elementsAppended, copiedOffset, length);
     return elementsAppended++;
   }
 
   public final int appendArray(int length) {
     reserve(elementsAppended + 1);
-    putArray(elementsAppended, arrayData().elementsAppended, length);
+    putArrayOffsetAndSize(elementsAppended, arrayData().elementsAppended, length);
     return elementsAppended++;
   }
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index a7d3744d00..4dc4d34db3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -34,19 +34,15 @@ public final class OffHeapColumnVector extends ColumnVector {
   // The data stored in these two allocations need to maintain binary compatible. We can
   // directly pass this buffer to external components.
   private long nulls;
+  // The actually data of this column vector will be stored here. If it's an array column vector,
+  // we will store the offsets and lengths here, and store the element data in child column vector.
   private long data;
 
-  // Set iff the type is array.
-  private long lengthData;
-  private long offsetData;
-
   protected OffHeapColumnVector(int capacity, DataType type) {
     super(capacity, type, MemoryMode.OFF_HEAP);
 
     nulls = 0;
     data = 0;
-    lengthData = 0;
-    offsetData = 0;
 
     reserveInternal(capacity);
     reset();
@@ -66,12 +62,8 @@ public final class OffHeapColumnVector extends ColumnVector {
   public void close() {
     Platform.freeMemory(nulls);
     Platform.freeMemory(data);
-    Platform.freeMemory(lengthData);
-    Platform.freeMemory(offsetData);
     nulls = 0;
     data = 0;
-    lengthData = 0;
-    offsetData = 0;
   }
 
   //
@@ -395,35 +387,6 @@ public final class OffHeapColumnVector extends ColumnVector {
     }
   }
 
-  //
-  // APIs dealing with Arrays.
-  //
-  @Override
-  public void putArray(int rowId, int offset, int length) {
-    assert(offset >= 0 && offset + length <= childColumns[0].capacity);
-    Platform.putInt(null, lengthData + 4 * rowId, length);
-    Platform.putInt(null, offsetData + 4 * rowId, offset);
-  }
-
-  @Override
-  public int getArrayLength(int rowId) {
-    return Platform.getInt(null, lengthData + 4 * rowId);
-  }
-
-  @Override
-  public int getArrayOffset(int rowId) {
-    return Platform.getInt(null, offsetData + 4 * rowId);
-  }
-
-  // APIs dealing with ByteArrays
-  @Override
-  public int putByteArray(int rowId, byte[] value, int offset, int length) {
-    int result = arrayData().appendBytes(length, value, offset);
-    Platform.putInt(null, lengthData + 4 * rowId, length);
-    Platform.putInt(null, offsetData + 4 * rowId, result);
-    return result;
-  }
-
   @Override
   public void loadBytes(ColumnVector.Array array) {
     if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length];
@@ -438,10 +401,8 @@ public final class OffHeapColumnVector extends ColumnVector {
   protected void reserveInternal(int newCapacity) {
     int oldCapacity = (this.data == 0L) ? 0 : capacity;
     if (this.resultArray != null) {
-      this.lengthData =
-          Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
-      this.offsetData =
-          Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4);
+      // need a long as offset and length for each array.
+      this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8);
     } else if (type instanceof ByteType || type instanceof BooleanType) {
       this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
     } else if (type instanceof ShortType) {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 94ed32294c..4d23405dc7 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -43,14 +43,12 @@ public final class OnHeapColumnVector extends ColumnVector {
   private byte[] byteData;
   private short[] shortData;
   private int[] intData;
+  // This is not only used to store data for long column vector, but also can store offsets and
+  // lengths for array column vector.
   private long[] longData;
   private float[] floatData;
   private double[] doubleData;
 
-  // Only set if type is Array.
-  private int[] arrayLengths;
-  private int[] arrayOffsets;
-
   protected OnHeapColumnVector(int capacity, DataType type) {
     super(capacity, type, MemoryMode.ON_HEAP);
     reserveInternal(capacity);
@@ -366,55 +364,22 @@ public final class OnHeapColumnVector extends ColumnVector {
     }
   }
 
-  //
-  // APIs dealing with Arrays
-  //
-
-  @Override
-  public int getArrayLength(int rowId) {
-    return arrayLengths[rowId];
-  }
-  @Override
-  public int getArrayOffset(int rowId) {
-    return arrayOffsets[rowId];
-  }
-
-  @Override
-  public void putArray(int rowId, int offset, int length) {
-    arrayOffsets[rowId] = offset;
-    arrayLengths[rowId] = length;
-  }
-
   @Override
   public void loadBytes(ColumnVector.Array array) {
     array.byteArray = byteData;
     array.byteArrayOffset = array.offset;
   }
 
-  //
-  // APIs dealing with Byte Arrays
-  //
-
-  @Override
-  public int putByteArray(int rowId, byte[] value, int offset, int length) {
-    int result = arrayData().appendBytes(length, value, offset);
-    arrayOffsets[rowId] = result;
-    arrayLengths[rowId] = length;
-    return result;
-  }
-
   // Spilt this function out since it is the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {
     if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) {
-      int[] newLengths = new int[newCapacity];
-      int[] newOffsets = new int[newCapacity];
-      if (this.arrayLengths != null) {
-        System.arraycopy(this.arrayLengths, 0, newLengths, 0, capacity);
-        System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, capacity);
+      // need 1 long as offset and length for each array.
+      if (longData == null || longData.length < newCapacity) {
+        long[] newData = new long[newCapacity];
+        if (longData != null) System.arraycopy(longData, 0, newData, 0, capacity);
+        longData = newData;
       }
-      arrayLengths = newLengths;
-      arrayOffsets = newOffsets;
     } else if (type instanceof BooleanType) {
       if (byteData == null || byteData.length < newCapacity) {
         byte[] newData = new byte[newCapacity];
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index e48e3f6402..5c4128a70d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -631,7 +631,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(column.arrayData().elementsAppended == 17)
 
       // Put the same "ll" at offset. This should not allocate more memory in the column.
-      column.putArray(idx, offset, 2)
+      column.putArrayOffsetAndSize(idx, offset, 2)
       reference += "ll"
       idx += 1
       assert(column.arrayData().elementsAppended == 17)
@@ -644,7 +644,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(column.arrayData().elementsAppended == 17 + (s + s).length)
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode)
+        val offsetAndLength = column.getLong(v._2)
+        assert(v._1.length == offsetAndLength.toInt, "MemoryMode=" + memMode)
         assert(v._1 == column.getUTF8String(v._2).toString,
           "MemoryMode" + memMode)
       }
@@ -659,7 +660,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode)
 
       // Fill the underlying data with all the arrays back to back.
-      val data = column.arrayData();
+      val data = column.arrayData()
       var i = 0
       while (i < 6) {
         data.putInt(i, i)
@@ -667,10 +668,10 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
 
       // Populate it with arrays [0], [1, 2], [], [3, 4, 5]
-      column.putArray(0, 0, 1)
-      column.putArray(1, 1, 2)
-      column.putArray(2, 2, 0)
-      column.putArray(3, 3, 3)
+      column.putArrayOffsetAndSize(0, 0, 1)
+      column.putArrayOffsetAndSize(1, 1, 2)
+      column.putArrayOffsetAndSize(2, 3, 0)
+      column.putArrayOffsetAndSize(3, 3, 3)
 
       val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
       val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]]
@@ -703,7 +704,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       data.reserve(array.length)
       assert(data.capacity == array.length * 2)
       data.putInts(0, array.length, array, 0)
-      column.putArray(0, 0, array.length)
+      column.putArrayOffsetAndSize(0, 0, array.length)
       assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
         === array)
     }}
-- 
GitLab