diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java index 949035bfb177ce8332141fc8e94aac7c76283696..3a10e9830f5811965273eff0afc250aca0539c33 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java @@ -59,16 +59,6 @@ public final class ArrowColumnVector extends ColumnVector { return numNulls() > 0; } - @Override - public long nullsNativeAddress() { - throw new RuntimeException("Cannot get native address for arrow column"); - } - - @Override - public long valuesNativeAddress() { - throw new RuntimeException("Cannot get native address for arrow column"); - } - @Override public void close() { if (childColumns != null) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 666fd63fdcf2f40ee7c7cba859407c29f952a42b..360ed83e2af2a42b351447296e865ecf3ce550e2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -62,13 +62,6 @@ public abstract class ColumnVector implements AutoCloseable { */ public abstract boolean anyNullsSet(); - /** - * Returns the off heap ptr for the arrays backing the NULLs and values buffer. Only valid - * to call for off heap columns. - */ - public abstract long nullsNativeAddress(); - public abstract long valuesNativeAddress(); - /** * Returns whether the value at rowId is NULL. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 2bf523b7e7198519d1aade29f7a1072736ba6a06..6b5c783d4fa8757ef1ca25c1e491eacd93facd2b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.vectorized; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import com.google.common.annotations.VisibleForTesting; + import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; @@ -73,12 +75,12 @@ public final class OffHeapColumnVector extends WritableColumnVector { reset(); } - @Override + @VisibleForTesting public long valuesNativeAddress() { return data; } - @Override + @VisibleForTesting public long nullsNativeAddress() { return nulls; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index d699d292711dce54100ec896b07858195a778163..a7b103a62b17a2e56f9c1f274132063893a74723 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -79,15 +79,6 @@ public final class OnHeapColumnVector extends WritableColumnVector { reset(); } - @Override - public long valuesNativeAddress() { - throw new RuntimeException("Cannot get native address for on heap column"); - } - @Override - public long nullsNativeAddress() { - throw new RuntimeException("Cannot get native address for on heap column"); - } - @Override public void close() { super.close(); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala index 1331f157363b01a8494729aa90e57b096325b352..705b26b8c91e6b90437e37711b054eef6978f138 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala @@ -36,15 +36,6 @@ import org.apache.spark.util.collection.BitSet * Benchmark to low level memory access using different ways to manage buffers. */ object ColumnarBatchBenchmark { - - def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): WritableColumnVector = { - if (memMode == MemoryMode.OFF_HEAP) { - new OffHeapColumnVector(capacity, dt) - } else { - new OnHeapColumnVector(capacity, dt) - } - } - // This benchmark reads and writes an array of ints. // TODO: there is a big (2x) penalty for a random access API for off heap. // Note: carefully if modifying this code. It's hard to reason about the JIT. @@ -151,7 +142,7 @@ object ColumnarBatchBenchmark { // Access through the column API with on heap memory val columnOnHeap = { i: Int => - val col = allocate(count, IntegerType, MemoryMode.ON_HEAP) + val col = new OnHeapColumnVector(count, IntegerType) var sum = 0L for (n <- 0L until iters) { var i = 0 @@ -170,7 +161,7 @@ object ColumnarBatchBenchmark { // Access through the column API with off heap memory def columnOffHeap = { i: Int => { - val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP) + val col = new OffHeapColumnVector(count, IntegerType) var sum = 0L for (n <- 0L until iters) { var i = 0 @@ -189,7 +180,7 @@ object ColumnarBatchBenchmark { // Access by directly getting the buffer backing the column. val columnOffheapDirect = { i: Int => - val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP) + val col = new OffHeapColumnVector(count, IntegerType) var sum = 0L for (n <- 0L until iters) { var addr = col.valuesNativeAddress() @@ -255,7 +246,7 @@ object ColumnarBatchBenchmark { // Adding values by appending, instead of putting. val onHeapAppend = { i: Int => - val col = allocate(count, IntegerType, MemoryMode.ON_HEAP) + val col = new OnHeapColumnVector(count, IntegerType) var sum = 0L for (n <- 0L until iters) { var i = 0 @@ -330,7 +321,7 @@ object ColumnarBatchBenchmark { for (n <- 0L until iters) { var i = 0 while (i < count) { - if (i % 2 == 0) b(i) = 1; + if (i % 2 == 0) b(i) = 1 i += 1 } i = 0 @@ -351,7 +342,7 @@ object ColumnarBatchBenchmark { } def stringAccess(iters: Long): Unit = { - val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" val random = new Random(0) def randomString(min: Int, max: Int): String = { @@ -359,10 +350,10 @@ object ColumnarBatchBenchmark { val sb = new StringBuilder(len) var i = 0 while (i < len) { - sb.append(chars.charAt(random.nextInt(chars.length()))); + sb.append(chars.charAt(random.nextInt(chars.length()))) i += 1 } - return sb.toString + sb.toString } val minString = 3 @@ -373,7 +364,12 @@ object ColumnarBatchBenchmark { .map(_.getBytes(StandardCharsets.UTF_8)).toArray def column(memoryMode: MemoryMode) = { i: Int => - val column = allocate(count, BinaryType, memoryMode) + val column = if (memoryMode == MemoryMode.OFF_HEAP) { + new OffHeapColumnVector(count, BinaryType) + } else { + new OnHeapColumnVector(count, BinaryType) + } + var sum = 0L for (n <- 0L until iters) { var i = 0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 4a6c8f5521d1882f2295fe373f36095cf635f114..80a50866aa504801a7ca7bb3f08e796ab9eda5e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -50,11 +50,11 @@ class ColumnarBatchSuite extends SparkFunSuite { name: String, size: Int, dt: DataType)( - block: (WritableColumnVector, MemoryMode) => Unit): Unit = { + block: WritableColumnVector => Unit): Unit = { test(name) { Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { mode => val vector = allocate(size, dt, mode) - try block(vector, mode) finally { + try block(vector) finally { vector.close() } } @@ -62,7 +62,7 @@ class ColumnarBatchSuite extends SparkFunSuite { } testVector("Null APIs", 1024, IntegerType) { - (column, memMode) => + column => val reference = mutable.ArrayBuffer.empty[Boolean] var idx = 0 assert(!column.anyNullsSet()) @@ -121,15 +121,11 @@ class ColumnarBatchSuite extends SparkFunSuite { reference.zipWithIndex.foreach { v => assert(v._1 == column.isNullAt(v._2)) - if (memMode == MemoryMode.OFF_HEAP) { - val addr = column.nullsNativeAddress() - assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2) - } } } testVector("Byte APIs", 1024, ByteType) { - (column, memMode) => + column => val reference = mutable.ArrayBuffer.empty[Byte] var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray @@ -173,16 +169,12 @@ class ColumnarBatchSuite extends SparkFunSuite { idx += 3 reference.zipWithIndex.foreach { v => - assert(v._1 == column.getByte(v._2), "MemoryMode" + memMode) - if (memMode == MemoryMode.OFF_HEAP) { - val addr = column.valuesNativeAddress() - assert(v._1 == Platform.getByte(null, addr + v._2)) - } + assert(v._1 == column.getByte(v._2), "VectorType=" + column.getClass.getSimpleName) } } testVector("Short APIs", 1024, ShortType) { - (column, memMode) => + column => val seed = System.currentTimeMillis() val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Short] @@ -248,16 +240,13 @@ class ColumnarBatchSuite extends SparkFunSuite { } reference.zipWithIndex.foreach { v => - assert(v._1 == column.getShort(v._2), "Seed = " + seed + " Mem Mode=" + memMode) - if (memMode == MemoryMode.OFF_HEAP) { - val addr = column.valuesNativeAddress() - assert(v._1 == Platform.getShort(null, addr + 2 * v._2)) - } + assert(v._1 == column.getShort(v._2), + "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName) } } testVector("Int APIs", 1024, IntegerType) { - (column, memMode) => + column => val seed = System.currentTimeMillis() val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Int] @@ -329,16 +318,13 @@ class ColumnarBatchSuite extends SparkFunSuite { } reference.zipWithIndex.foreach { v => - assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Mem Mode=" + memMode) - if (memMode == MemoryMode.OFF_HEAP) { - val addr = column.valuesNativeAddress() - assert(v._1 == Platform.getInt(null, addr + 4 * v._2)) - } + assert(v._1 == column.getInt(v._2), + "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName) } } testVector("Long APIs", 1024, LongType) { - (column, memMode) => + column => val seed = System.currentTimeMillis() val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Long] @@ -413,16 +399,12 @@ class ColumnarBatchSuite extends SparkFunSuite { reference.zipWithIndex.foreach { v => assert(v._1 == column.getLong(v._2), "idx=" + v._2 + - " Seed = " + seed + " MemMode=" + memMode) - if (memMode == MemoryMode.OFF_HEAP) { - val addr = column.valuesNativeAddress() - assert(v._1 == Platform.getLong(null, addr + 8 * v._2)) - } + " Seed = " + seed + " VectorType=" + column.getClass.getSimpleName) } } testVector("Float APIs", 1024, FloatType) { - (column, memMode) => + column => val seed = System.currentTimeMillis() val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Float] @@ -500,16 +482,13 @@ class ColumnarBatchSuite extends SparkFunSuite { } reference.zipWithIndex.foreach { v => - assert(v._1 == column.getFloat(v._2), "Seed = " + seed + " MemMode=" + memMode) - if (memMode == MemoryMode.OFF_HEAP) { - val addr = column.valuesNativeAddress() - assert(v._1 == Platform.getFloat(null, addr + 4 * v._2)) - } + assert(v._1 == column.getFloat(v._2), + "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName) } } testVector("Double APIs", 1024, DoubleType) { - (column, memMode) => + column => val seed = System.currentTimeMillis() val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Double] @@ -587,16 +566,13 @@ class ColumnarBatchSuite extends SparkFunSuite { } reference.zipWithIndex.foreach { v => - assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " MemMode=" + memMode) - if (memMode == MemoryMode.OFF_HEAP) { - val addr = column.valuesNativeAddress() - assert(v._1 == Platform.getDouble(null, addr + 8 * v._2)) - } + assert(v._1 == column.getDouble(v._2), + "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName) } } testVector("String APIs", 6, StringType) { - (column, memMode) => + column => val reference = mutable.ArrayBuffer.empty[String] assert(column.arrayData().elementsAppended == 0) @@ -643,9 +619,9 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.arrayData().elementsAppended == 17 + (s + s).length) reference.zipWithIndex.foreach { v => - assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode) - assert(v._1 == column.getUTF8String(v._2).toString, - "MemoryMode" + memMode) + val errMsg = "VectorType=" + column.getClass.getSimpleName + assert(v._1.length == column.getArrayLength(v._2), errMsg) + assert(v._1 == column.getUTF8String(v._2).toString, errMsg) } column.reset() @@ -653,7 +629,7 @@ class ColumnarBatchSuite extends SparkFunSuite { } testVector("Int Array", 10, new ArrayType(IntegerType, true)) { - (column, _) => + column => // Fill the underlying data with all the arrays back to back. val data = column.arrayData() @@ -763,7 +739,7 @@ class ColumnarBatchSuite extends SparkFunSuite { testVector( "Struct Column", 10, - new StructType().add("int", IntegerType).add("double", DoubleType)) { (column, _) => + new StructType().add("int", IntegerType).add("double", DoubleType)) { column => val c1 = column.getChildColumn(0) val c2 = column.getChildColumn(1) assert(c1.dataType() == IntegerType) @@ -789,7 +765,7 @@ class ColumnarBatchSuite extends SparkFunSuite { } testVector("Nest Array in Array", 10, new ArrayType(new ArrayType(IntegerType, true), true)) { - (column, _) => + column => val childColumn = column.arrayData() val data = column.arrayData().arrayData() (0 until 6).foreach { @@ -822,7 +798,7 @@ class ColumnarBatchSuite extends SparkFunSuite { testVector( "Nest Struct in Array", 10, - new ArrayType(structType, true)) { (column, _) => + new ArrayType(structType, true)) { column => val data = column.arrayData() val c0 = data.getChildColumn(0) val c1 = data.getChildColumn(1) @@ -851,7 +827,7 @@ class ColumnarBatchSuite extends SparkFunSuite { 10, new StructType() .add("int", IntegerType) - .add("array", new ArrayType(IntegerType, true))) { (column, _) => + .add("array", new ArrayType(IntegerType, true))) { column => val c0 = column.getChildColumn(0) val c1 = column.getChildColumn(1) c0.putInt(0, 0) @@ -880,7 +856,7 @@ class ColumnarBatchSuite extends SparkFunSuite { testVector( "Nest Struct in Struct", 10, - new StructType().add("int", IntegerType).add("struct", subSchema)) { (column, _) => + new StructType().add("int", IntegerType).add("struct", subSchema)) { column => val c0 = column.getChildColumn(0) val c1 = column.getChildColumn(1) c0.putInt(0, 0)