From 02bb0682e68a2ce81f3b98d33649d368da7f2b3d Mon Sep 17 00:00:00 2001
From: Herman van Hovell <hvanhovell@databricks.com>
Date: Wed, 27 Sep 2017 23:08:30 +0200
Subject: [PATCH] [SPARK-22143][SQL] Fix memory leak in OffHeapColumnVector

## What changes were proposed in this pull request?
`WriteableColumnVector` does not close its child column vectors. This can create memory leaks for `OffHeapColumnVector` where we do not clean up the memory allocated by a vectors children. This can be especially bad for string columns (which uses a child byte column vector).

## How was this patch tested?
I have updated the existing tests to always use both on-heap and off-heap vectors. Testing and diagnoses was done locally.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #19367 from hvanhovell/SPARK-22143.
---
 .../vectorized/OffHeapColumnVector.java       |   1 +
 .../vectorized/OnHeapColumnVector.java        |  10 +
 .../vectorized/WritableColumnVector.java      |  18 ++
 .../vectorized/ColumnVectorSuite.scala        | 102 +++++----
 .../vectorized/ColumnarBatchSuite.scala       | 194 ++++++++----------
 5 files changed, 165 insertions(+), 160 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index e1d36858d4..8cbc895506 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -85,6 +85,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
 
   @Override
   public void close() {
+    super.close();
     Platform.freeMemory(nulls);
     Platform.freeMemory(data);
     Platform.freeMemory(lengthData);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 96a452978c..2725a29eea 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -90,6 +90,16 @@ public final class OnHeapColumnVector extends WritableColumnVector {
 
   @Override
   public void close() {
+    super.close();
+    nulls = null;
+    byteData = null;
+    shortData = null;
+    intData = null;
+    longData = null;
+    floatData = null;
+    doubleData = null;
+    arrayLengths = null;
+    arrayOffsets = null;
   }
 
   //
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 0bddc351e1..163f2511e5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -59,6 +59,24 @@ public abstract class WritableColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public void close() {
+    if (childColumns != null) {
+      for (int i = 0; i < childColumns.length; i++) {
+        childColumns[i].close();
+        childColumns[i] = null;
+      }
+      childColumns = null;
+    }
+    if (dictionaryIds != null) {
+      dictionaryIds.close();
+      dictionaryIds = null;
+    }
+    dictionary = null;
+    resultStruct = null;
+    resultArray = null;
+  }
+
   public void reserve(int requiredCapacity) {
     if (requiredCapacity > capacity) {
       int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index f7b06c97f9..85da8270d4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -25,19 +25,24 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
-
-  var testVector: WritableColumnVector = _
-
-  private def allocate(capacity: Int, dt: DataType): WritableColumnVector = {
-    new OnHeapColumnVector(capacity, dt)
+  private def withVector(
+      vector: WritableColumnVector)(
+      block: WritableColumnVector => Unit): Unit = {
+    try block(vector) finally vector.close()
   }
 
-  override def afterEach(): Unit = {
-    testVector.close()
+  private def testVectors(
+      name: String,
+      size: Int,
+      dt: DataType)(
+      block: WritableColumnVector => Unit): Unit = {
+    test(name) {
+      withVector(new OnHeapColumnVector(size, dt))(block)
+      withVector(new OffHeapColumnVector(size, dt))(block)
+    }
   }
 
-  test("boolean") {
-    testVector = allocate(10, BooleanType)
+  testVectors("boolean", 10, BooleanType) { testVector =>
     (0 until 10).foreach { i =>
       testVector.appendBoolean(i % 2 == 0)
     }
@@ -49,8 +54,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
     }
   }
 
-  test("byte") {
-    testVector = allocate(10, ByteType)
+  testVectors("byte", 10, ByteType) { testVector =>
     (0 until 10).foreach { i =>
       testVector.appendByte(i.toByte)
     }
@@ -58,12 +62,11 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
     val array = new ColumnVector.Array(testVector)
 
     (0 until 10).foreach { i =>
-      assert(array.get(i, ByteType) === (i.toByte))
+      assert(array.get(i, ByteType) === i.toByte)
     }
   }
 
-  test("short") {
-    testVector = allocate(10, ShortType)
+  testVectors("short", 10, ShortType) { testVector =>
     (0 until 10).foreach { i =>
       testVector.appendShort(i.toShort)
     }
@@ -71,12 +74,11 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
     val array = new ColumnVector.Array(testVector)
 
     (0 until 10).foreach { i =>
-      assert(array.get(i, ShortType) === (i.toShort))
+      assert(array.get(i, ShortType) === i.toShort)
     }
   }
 
-  test("int") {
-    testVector = allocate(10, IntegerType)
+  testVectors("int", 10, IntegerType) { testVector =>
     (0 until 10).foreach { i =>
       testVector.appendInt(i)
     }
@@ -88,8 +90,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
     }
   }
 
-  test("long") {
-    testVector = allocate(10, LongType)
+  testVectors("long", 10, LongType) { testVector =>
     (0 until 10).foreach { i =>
       testVector.appendLong(i)
     }
@@ -101,8 +102,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
     }
   }
 
-  test("float") {
-    testVector = allocate(10, FloatType)
+  testVectors("float", 10, FloatType) { testVector =>
     (0 until 10).foreach { i =>
       testVector.appendFloat(i.toFloat)
     }
@@ -114,8 +114,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
     }
   }
 
-  test("double") {
-    testVector = allocate(10, DoubleType)
+  testVectors("double", 10, DoubleType) { testVector =>
     (0 until 10).foreach { i =>
       testVector.appendDouble(i.toDouble)
     }
@@ -127,8 +126,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
     }
   }
 
-  test("string") {
-    testVector = allocate(10, StringType)
+  testVectors("string", 10, StringType) { testVector =>
     (0 until 10).map { i =>
       val utf8 = s"str$i".getBytes("utf8")
       testVector.appendByteArray(utf8, 0, utf8.length)
@@ -141,8 +139,7 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
     }
   }
 
-  test("binary") {
-    testVector = allocate(10, BinaryType)
+  testVectors("binary", 10, BinaryType) { testVector =>
     (0 until 10).map { i =>
       val utf8 = s"str$i".getBytes("utf8")
       testVector.appendByteArray(utf8, 0, utf8.length)
@@ -156,9 +153,8 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
     }
   }
 
-  test("array") {
-    val arrayType = ArrayType(IntegerType, true)
-    testVector = allocate(10, arrayType)
+  val arrayType: ArrayType = ArrayType(IntegerType, containsNull = true)
+  testVectors("array", 10, arrayType) { testVector =>
 
     val data = testVector.arrayData()
     var i = 0
@@ -181,9 +177,8 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
     assert(array.get(3, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(3, 4, 5))
   }
 
-  test("struct") {
-    val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
-    testVector = allocate(10, schema)
+  val structType: StructType = new StructType().add("int", IntegerType).add("double", DoubleType)
+  testVectors("struct", 10, structType) { testVector =>
     val c1 = testVector.getChildColumn(0)
     val c2 = testVector.getChildColumn(1)
     c1.putInt(0, 123)
@@ -193,35 +188,34 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
 
     val array = new ColumnVector.Array(testVector)
 
-    assert(array.get(0, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 123)
-    assert(array.get(0, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 3.45)
-    assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456)
-    assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67)
+    assert(array.get(0, structType).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 123)
+    assert(array.get(0, structType).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 3.45)
+    assert(array.get(1, structType).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456)
+    assert(array.get(1, structType).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67)
   }
 
   test("[SPARK-22092] off-heap column vector reallocation corrupts array data") {
-    val arrayType = ArrayType(IntegerType, true)
-    testVector = new OffHeapColumnVector(8, arrayType)
+    withVector(new OffHeapColumnVector(8, arrayType)) { testVector =>
+      val data = testVector.arrayData()
+      (0 until 8).foreach(i => data.putInt(i, i))
+      (0 until 8).foreach(i => testVector.putArray(i, i, 1))
 
-    val data = testVector.arrayData()
-    (0 until 8).foreach(i => data.putInt(i, i))
-    (0 until 8).foreach(i => testVector.putArray(i, i, 1))
+      // Increase vector's capacity and reallocate the data to new bigger buffers.
+      testVector.reserve(16)
 
-    // Increase vector's capacity and reallocate the data to new bigger buffers.
-    testVector.reserve(16)
-
-    // Check that none of the values got lost/overwritten.
-    val array = new ColumnVector.Array(testVector)
-    (0 until 8).foreach { i =>
-      assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i))
+      // Check that none of the values got lost/overwritten.
+      val array = new ColumnVector.Array(testVector)
+      (0 until 8).foreach { i =>
+        assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i))
+      }
     }
   }
 
   test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") {
-    val structType = new StructType().add("int", IntegerType).add("double", DoubleType)
-    testVector = new OffHeapColumnVector(8, structType)
-    (0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i))
-    testVector.reserve(16)
-    (0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
+    withVector(new OffHeapColumnVector(8, structType)) { testVector =>
+      (0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i))
+      testVector.reserve(16)
+      (0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
+    }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index ebf7661334..983eb10368 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -38,7 +38,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
 
 class ColumnarBatchSuite extends SparkFunSuite {
 
-  def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): WritableColumnVector = {
+  private def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): WritableColumnVector = {
     if (memMode == MemoryMode.OFF_HEAP) {
       new OffHeapColumnVector(capacity, dt)
     } else {
@@ -46,23 +46,36 @@ class ColumnarBatchSuite extends SparkFunSuite {
     }
   }
 
-  test("Null Apis") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
-      val reference = mutable.ArrayBuffer.empty[Boolean]
+  private def testVector(
+      name: String,
+      size: Int,
+      dt: DataType)(
+      block: (WritableColumnVector, MemoryMode) => Unit): Unit = {
+    test(name) {
+      Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { mode =>
+        val vector = allocate(size, dt, mode)
+        try block(vector, mode) finally {
+          vector.close()
+        }
+      }
+    }
+  }
 
-      val column = allocate(1024, IntegerType, memMode)
+  testVector("Null APIs", 1024, IntegerType) {
+    (column, memMode) =>
+      val reference = mutable.ArrayBuffer.empty[Boolean]
       var idx = 0
-      assert(column.anyNullsSet() == false)
+      assert(!column.anyNullsSet())
       assert(column.numNulls() == 0)
 
       column.appendNotNull()
       reference += false
-      assert(column.anyNullsSet() == false)
+      assert(!column.anyNullsSet())
       assert(column.numNulls() == 0)
 
       column.appendNotNulls(3)
       (1 to 3).foreach(_ => reference += false)
-      assert(column.anyNullsSet() == false)
+      assert(!column.anyNullsSet())
       assert(column.numNulls() == 0)
 
       column.appendNull()
@@ -113,16 +126,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
           assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
         }
       }
-      column.close
-    }}
   }
 
-  test("Byte Apis") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+  testVector("Byte APIs", 1024, ByteType) {
+    (column, memMode) =>
       val reference = mutable.ArrayBuffer.empty[Byte]
 
-      val column = allocate(1024, ByteType, memMode)
-
       var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray
       column.appendBytes(2, values, 0)
       reference += 10.toByte
@@ -170,17 +179,14 @@ class ColumnarBatchSuite extends SparkFunSuite {
           assert(v._1 == Platform.getByte(null, addr + v._2))
         }
       }
-    }}
   }
 
-  test("Short Apis") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+  testVector("Short APIs", 1024, ShortType) {
+    (column, memMode) =>
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Short]
 
-      val column = allocate(1024, ShortType, memMode)
-
       var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toShort).toArray
       column.appendShorts(2, values, 0)
       reference += 10.toShort
@@ -248,19 +254,14 @@ class ColumnarBatchSuite extends SparkFunSuite {
           assert(v._1 == Platform.getShort(null, addr + 2 * v._2))
         }
       }
-
-      column.close
-    }}
   }
 
-  test("Int Apis") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+  testVector("Int APIs", 1024, IntegerType) {
+    (column, memMode) =>
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Int]
 
-      val column = allocate(1024, IntegerType, memMode)
-
       var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).toArray
       column.appendInts(2, values, 0)
       reference += 10
@@ -334,18 +335,14 @@ class ColumnarBatchSuite extends SparkFunSuite {
           assert(v._1 == Platform.getInt(null, addr + 4 * v._2))
         }
       }
-      column.close
-    }}
   }
 
-  test("Long Apis") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+  testVector("Long APIs", 1024, LongType) {
+    (column, memMode) =>
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Long]
 
-      val column = allocate(1024, LongType, memMode)
-
       var values = (10L :: 20L :: 30L :: 40L :: 50L :: Nil).toArray
       column.appendLongs(2, values, 0)
       reference += 10L
@@ -422,17 +419,14 @@ class ColumnarBatchSuite extends SparkFunSuite {
           assert(v._1 == Platform.getLong(null, addr + 8 * v._2))
         }
       }
-    }}
   }
 
-  test("Float APIs") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+  testVector("Float APIs", 1024, FloatType) {
+    (column, memMode) =>
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Float]
 
-      val column = allocate(1024, FloatType, memMode)
-
       var values = (.1f :: .2f :: .3f :: .4f :: .5f :: Nil).toArray
       column.appendFloats(2, values, 0)
       reference += .1f
@@ -512,18 +506,14 @@ class ColumnarBatchSuite extends SparkFunSuite {
           assert(v._1 == Platform.getFloat(null, addr + 4 * v._2))
         }
       }
-      column.close
-    }}
   }
 
-  test("Double APIs") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+  testVector("Double APIs", 1024, DoubleType) {
+    (column, memMode) =>
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Double]
 
-      val column = allocate(1024, DoubleType, memMode)
-
       var values = (.1 :: .2 :: .3 :: .4 :: .5 :: Nil).toArray
       column.appendDoubles(2, values, 0)
       reference += .1
@@ -603,15 +593,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
           assert(v._1 == Platform.getDouble(null, addr + 8 * v._2))
         }
       }
-      column.close
-    }}
   }
 
-  test("String APIs") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+  testVector("String APIs", 6, StringType) {
+    (column, memMode) =>
       val reference = mutable.ArrayBuffer.empty[String]
 
-      val column = allocate(6, BinaryType, memMode)
       assert(column.arrayData().elementsAppended == 0)
 
       val str = "string"
@@ -663,15 +650,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
       column.reset()
       assert(column.arrayData().elementsAppended == 0)
-    }}
   }
 
-  test("Int Array") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
-      val column = allocate(10, new ArrayType(IntegerType, true), memMode)
+  testVector("Int Array", 10, new ArrayType(IntegerType, true)) {
+    (column, _) =>
 
       // Fill the underlying data with all the arrays back to back.
-      val data = column.arrayData();
+      val data = column.arrayData()
       var i = 0
       while (i < 6) {
         data.putInt(i, i)
@@ -709,7 +694,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(column.getArray(3).getInt(2) == 5)
 
       // Add a longer array which requires resizing
-      column.reset
+      column.reset()
       val array = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
       assert(data.capacity == 10)
       data.reserve(array.length)
@@ -718,63 +703,67 @@ class ColumnarBatchSuite extends SparkFunSuite {
       column.putArray(0, 0, array.length)
       assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
         === array)
-    }}
   }
 
   test("toArray for primitive types") {
-    // (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
-    (MemoryMode.ON_HEAP :: Nil).foreach { memMode => {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
       val len = 4
 
       val columnBool = allocate(len, new ArrayType(BooleanType, false), memMode)
       val boolArray = Array(false, true, false, true)
-      boolArray.zipWithIndex.map { case (v, i) => columnBool.arrayData.putBoolean(i, v) }
+      boolArray.zipWithIndex.foreach { case (v, i) => columnBool.arrayData.putBoolean(i, v) }
       columnBool.putArray(0, 0, len)
       assert(columnBool.getArray(0).toBooleanArray === boolArray)
+      columnBool.close()
 
       val columnByte = allocate(len, new ArrayType(ByteType, false), memMode)
       val byteArray = Array[Byte](0, 1, 2, 3)
-      byteArray.zipWithIndex.map { case (v, i) => columnByte.arrayData.putByte(i, v) }
+      byteArray.zipWithIndex.foreach { case (v, i) => columnByte.arrayData.putByte(i, v) }
       columnByte.putArray(0, 0, len)
       assert(columnByte.getArray(0).toByteArray === byteArray)
+      columnByte.close()
 
       val columnShort = allocate(len, new ArrayType(ShortType, false), memMode)
       val shortArray = Array[Short](0, 1, 2, 3)
-      shortArray.zipWithIndex.map { case (v, i) => columnShort.arrayData.putShort(i, v) }
+      shortArray.zipWithIndex.foreach { case (v, i) => columnShort.arrayData.putShort(i, v) }
       columnShort.putArray(0, 0, len)
       assert(columnShort.getArray(0).toShortArray === shortArray)
+      columnShort.close()
 
       val columnInt = allocate(len, new ArrayType(IntegerType, false), memMode)
       val intArray = Array(0, 1, 2, 3)
-      intArray.zipWithIndex.map { case (v, i) => columnInt.arrayData.putInt(i, v) }
+      intArray.zipWithIndex.foreach { case (v, i) => columnInt.arrayData.putInt(i, v) }
       columnInt.putArray(0, 0, len)
       assert(columnInt.getArray(0).toIntArray === intArray)
+      columnInt.close()
 
       val columnLong = allocate(len, new ArrayType(LongType, false), memMode)
       val longArray = Array[Long](0, 1, 2, 3)
-      longArray.zipWithIndex.map { case (v, i) => columnLong.arrayData.putLong(i, v) }
+      longArray.zipWithIndex.foreach { case (v, i) => columnLong.arrayData.putLong(i, v) }
       columnLong.putArray(0, 0, len)
       assert(columnLong.getArray(0).toLongArray === longArray)
+      columnLong.close()
 
       val columnFloat = allocate(len, new ArrayType(FloatType, false), memMode)
       val floatArray = Array(0.0F, 1.1F, 2.2F, 3.3F)
-      floatArray.zipWithIndex.map { case (v, i) => columnFloat.arrayData.putFloat(i, v) }
+      floatArray.zipWithIndex.foreach { case (v, i) => columnFloat.arrayData.putFloat(i, v) }
       columnFloat.putArray(0, 0, len)
       assert(columnFloat.getArray(0).toFloatArray === floatArray)
+      columnFloat.close()
 
       val columnDouble = allocate(len, new ArrayType(DoubleType, false), memMode)
       val doubleArray = Array(0.0, 1.1, 2.2, 3.3)
-      doubleArray.zipWithIndex.map { case (v, i) => columnDouble.arrayData.putDouble(i, v) }
+      doubleArray.zipWithIndex.foreach { case (v, i) => columnDouble.arrayData.putDouble(i, v) }
       columnDouble.putArray(0, 0, len)
       assert(columnDouble.getArray(0).toDoubleArray === doubleArray)
-    }}
+      columnDouble.close()
+    }
   }
 
-  test("Struct Column") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
-      val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
-      val column = allocate(1024, schema, memMode)
-
+  testVector(
+    "Struct Column",
+    10,
+    new StructType().add("int", IntegerType).add("double", DoubleType)) { (column, _) =>
       val c1 = column.getChildColumn(0)
       val c2 = column.getChildColumn(1)
       assert(c1.dataType() == IntegerType)
@@ -797,13 +786,10 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val s2 = column.getStruct(1)
       assert(s2.getInt(0) == 456)
       assert(s2.getDouble(1) == 5.67)
-    }}
   }
 
-  test("Nest Array in Array.") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
-      val column = allocate(10, new ArrayType(new ArrayType(IntegerType, true), true),
-        memMode)
+  testVector("Nest Array in Array", 10, new ArrayType(new ArrayType(IntegerType, true), true)) {
+    (column, _) =>
       val childColumn = column.arrayData()
       val data = column.arrayData().arrayData()
       (0 until 6).foreach {
@@ -829,13 +815,14 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(column.getArray(2).getArray(1).getInt(1) === 4)
       assert(column.getArray(2).getArray(1).getInt(2) === 5)
       assert(column.isNullAt(3))
-    }
   }
 
-  test("Nest Struct in Array.") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
-      val schema = new StructType().add("int", IntegerType).add("long", LongType)
-      val column = allocate(10, new ArrayType(schema, true), memMode)
+  private val structType: StructType = new StructType().add("i", IntegerType).add("l", LongType)
+
+  testVector(
+    "Nest Struct in Array",
+    10,
+    new ArrayType(structType, true)) { (column, _) =>
       val data = column.arrayData()
       val c0 = data.getChildColumn(0)
       val c1 = data.getChildColumn(1)
@@ -850,22 +837,21 @@ class ColumnarBatchSuite extends SparkFunSuite {
       column.putArray(1, 1, 3)
       column.putArray(2, 4, 2)
 
-      assert(column.getArray(0).getStruct(0, 2).toSeq(schema) === Seq(0, 0))
-      assert(column.getArray(0).getStruct(1, 2).toSeq(schema) === Seq(1, 10))
-      assert(column.getArray(1).getStruct(0, 2).toSeq(schema) === Seq(1, 10))
-      assert(column.getArray(1).getStruct(1, 2).toSeq(schema) === Seq(2, 20))
-      assert(column.getArray(1).getStruct(2, 2).toSeq(schema) === Seq(3, 30))
-      assert(column.getArray(2).getStruct(0, 2).toSeq(schema) === Seq(4, 40))
-      assert(column.getArray(2).getStruct(1, 2).toSeq(schema) === Seq(5, 50))
-    }
+      assert(column.getArray(0).getStruct(0, 2).toSeq(structType) === Seq(0, 0))
+      assert(column.getArray(0).getStruct(1, 2).toSeq(structType) === Seq(1, 10))
+      assert(column.getArray(1).getStruct(0, 2).toSeq(structType) === Seq(1, 10))
+      assert(column.getArray(1).getStruct(1, 2).toSeq(structType) === Seq(2, 20))
+      assert(column.getArray(1).getStruct(2, 2).toSeq(structType) === Seq(3, 30))
+      assert(column.getArray(2).getStruct(0, 2).toSeq(structType) === Seq(4, 40))
+      assert(column.getArray(2).getStruct(1, 2).toSeq(structType) === Seq(5, 50))
   }
 
-  test("Nest Array in Struct.") {
-    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
-      val schema = new StructType()
-        .add("int", IntegerType)
-        .add("array", new ArrayType(IntegerType, true))
-      val column = allocate(10, schema, memMode)
+  testVector(
+    "Nest Array in Struct",
+    10,
+    new StructType()
+      .add("int", IntegerType)
+      .add("array", new ArrayType(IntegerType, true))) { (column, _) =>
       val c0 = column.getChildColumn(0)
       val c1 = column.getChildColumn(1)
       c0.putInt(0, 0)
@@ -886,18 +872,15 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(column.getStruct(1).getArray(1).toIntArray() === Array(2))
       assert(column.getStruct(2).getInt(0) === 2)
       assert(column.getStruct(2).getArray(1).toIntArray() === Array(3, 4, 5))
-    }
   }
 
-  test("Nest Struct in Struct.") {
-    (MemoryMode.ON_HEAP :: Nil).foreach { memMode =>
-      val subSchema = new StructType()
-        .add("int", IntegerType)
-        .add("int", IntegerType)
-      val schema = new StructType()
-        .add("int", IntegerType)
-        .add("struct", subSchema)
-      val column = allocate(10, schema, memMode)
+  private val subSchema: StructType = new StructType()
+    .add("int", IntegerType)
+    .add("int", IntegerType)
+  testVector(
+    "Nest Struct in Struct",
+    10,
+    new StructType().add("int", IntegerType).add("struct", subSchema)) { (column, _) =>
       val c0 = column.getChildColumn(0)
       val c1 = column.getChildColumn(1)
       c0.putInt(0, 0)
@@ -919,7 +902,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(column.getStruct(1).getStruct(1, 2).toSeq(subSchema) === Seq(8, 80))
       assert(column.getStruct(2).getInt(0) === 2)
       assert(column.getStruct(2).getStruct(1, 2).toSeq(subSchema) === Seq(9, 90))
-    }
   }
 
   test("ColumnarBatch basic") {
@@ -1040,7 +1022,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val it4 = batch.rowIterator()
       rowEquals(it4.next(), Row(null, 2.2, 2, "abc"))
 
-      batch.close
+      batch.close()
     }}
   }
 
-- 
GitLab