Skip to content
Snippets Groups Projects
Commit e3fd93f1 authored by Wenchen Fan's avatar Wenchen Fan Committed by gatorsmile
Browse files

[SPARK-22604][SQL] remove the get address methods from ColumnVector

## What changes were proposed in this pull request?

`nullsNativeAddress` and `valuesNativeAddress` are only used in tests and benchmark, no need to be top class API.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19818 from cloud-fan/minor.
parent 70221903
No related branches found
No related tags found
No related merge requests found
......@@ -59,16 +59,6 @@ public final class ArrowColumnVector extends ColumnVector {
return numNulls() > 0;
}
@Override
public long nullsNativeAddress() {
throw new RuntimeException("Cannot get native address for arrow column");
}
@Override
public long valuesNativeAddress() {
throw new RuntimeException("Cannot get native address for arrow column");
}
@Override
public void close() {
if (childColumns != null) {
......
......@@ -62,13 +62,6 @@ public abstract class ColumnVector implements AutoCloseable {
*/
public abstract boolean anyNullsSet();
/**
* Returns the off heap ptr for the arrays backing the NULLs and values buffer. Only valid
* to call for off heap columns.
*/
public abstract long nullsNativeAddress();
public abstract long valuesNativeAddress();
/**
* Returns whether the value at rowId is NULL.
*/
......
......@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.vectorized;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
......@@ -73,12 +75,12 @@ public final class OffHeapColumnVector extends WritableColumnVector {
reset();
}
@Override
@VisibleForTesting
public long valuesNativeAddress() {
return data;
}
@Override
@VisibleForTesting
public long nullsNativeAddress() {
return nulls;
}
......
......@@ -79,15 +79,6 @@ public final class OnHeapColumnVector extends WritableColumnVector {
reset();
}
@Override
public long valuesNativeAddress() {
throw new RuntimeException("Cannot get native address for on heap column");
}
@Override
public long nullsNativeAddress() {
throw new RuntimeException("Cannot get native address for on heap column");
}
@Override
public void close() {
super.close();
......
......@@ -36,15 +36,6 @@ import org.apache.spark.util.collection.BitSet
* Benchmark to low level memory access using different ways to manage buffers.
*/
object ColumnarBatchBenchmark {
def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): WritableColumnVector = {
if (memMode == MemoryMode.OFF_HEAP) {
new OffHeapColumnVector(capacity, dt)
} else {
new OnHeapColumnVector(capacity, dt)
}
}
// This benchmark reads and writes an array of ints.
// TODO: there is a big (2x) penalty for a random access API for off heap.
// Note: carefully if modifying this code. It's hard to reason about the JIT.
......@@ -151,7 +142,7 @@ object ColumnarBatchBenchmark {
// Access through the column API with on heap memory
val columnOnHeap = { i: Int =>
val col = allocate(count, IntegerType, MemoryMode.ON_HEAP)
val col = new OnHeapColumnVector(count, IntegerType)
var sum = 0L
for (n <- 0L until iters) {
var i = 0
......@@ -170,7 +161,7 @@ object ColumnarBatchBenchmark {
// Access through the column API with off heap memory
def columnOffHeap = { i: Int => {
val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP)
val col = new OffHeapColumnVector(count, IntegerType)
var sum = 0L
for (n <- 0L until iters) {
var i = 0
......@@ -189,7 +180,7 @@ object ColumnarBatchBenchmark {
// Access by directly getting the buffer backing the column.
val columnOffheapDirect = { i: Int =>
val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP)
val col = new OffHeapColumnVector(count, IntegerType)
var sum = 0L
for (n <- 0L until iters) {
var addr = col.valuesNativeAddress()
......@@ -255,7 +246,7 @@ object ColumnarBatchBenchmark {
// Adding values by appending, instead of putting.
val onHeapAppend = { i: Int =>
val col = allocate(count, IntegerType, MemoryMode.ON_HEAP)
val col = new OnHeapColumnVector(count, IntegerType)
var sum = 0L
for (n <- 0L until iters) {
var i = 0
......@@ -330,7 +321,7 @@ object ColumnarBatchBenchmark {
for (n <- 0L until iters) {
var i = 0
while (i < count) {
if (i % 2 == 0) b(i) = 1;
if (i % 2 == 0) b(i) = 1
i += 1
}
i = 0
......@@ -351,7 +342,7 @@ object ColumnarBatchBenchmark {
}
def stringAccess(iters: Long): Unit = {
val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
val random = new Random(0)
def randomString(min: Int, max: Int): String = {
......@@ -359,10 +350,10 @@ object ColumnarBatchBenchmark {
val sb = new StringBuilder(len)
var i = 0
while (i < len) {
sb.append(chars.charAt(random.nextInt(chars.length())));
sb.append(chars.charAt(random.nextInt(chars.length())))
i += 1
}
return sb.toString
sb.toString
}
val minString = 3
......@@ -373,7 +364,12 @@ object ColumnarBatchBenchmark {
.map(_.getBytes(StandardCharsets.UTF_8)).toArray
def column(memoryMode: MemoryMode) = { i: Int =>
val column = allocate(count, BinaryType, memoryMode)
val column = if (memoryMode == MemoryMode.OFF_HEAP) {
new OffHeapColumnVector(count, BinaryType)
} else {
new OnHeapColumnVector(count, BinaryType)
}
var sum = 0L
for (n <- 0L until iters) {
var i = 0
......
......@@ -50,11 +50,11 @@ class ColumnarBatchSuite extends SparkFunSuite {
name: String,
size: Int,
dt: DataType)(
block: (WritableColumnVector, MemoryMode) => Unit): Unit = {
block: WritableColumnVector => Unit): Unit = {
test(name) {
Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { mode =>
val vector = allocate(size, dt, mode)
try block(vector, mode) finally {
try block(vector) finally {
vector.close()
}
}
......@@ -62,7 +62,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
testVector("Null APIs", 1024, IntegerType) {
(column, memMode) =>
column =>
val reference = mutable.ArrayBuffer.empty[Boolean]
var idx = 0
assert(!column.anyNullsSet())
......@@ -121,15 +121,11 @@ class ColumnarBatchSuite extends SparkFunSuite {
reference.zipWithIndex.foreach { v =>
assert(v._1 == column.isNullAt(v._2))
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.nullsNativeAddress()
assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
}
}
}
testVector("Byte APIs", 1024, ByteType) {
(column, memMode) =>
column =>
val reference = mutable.ArrayBuffer.empty[Byte]
var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray
......@@ -173,16 +169,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
idx += 3
reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getByte(v._2), "MemoryMode" + memMode)
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getByte(null, addr + v._2))
}
assert(v._1 == column.getByte(v._2), "VectorType=" + column.getClass.getSimpleName)
}
}
testVector("Short APIs", 1024, ShortType) {
(column, memMode) =>
column =>
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Short]
......@@ -248,16 +240,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getShort(v._2), "Seed = " + seed + " Mem Mode=" + memMode)
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getShort(null, addr + 2 * v._2))
}
assert(v._1 == column.getShort(v._2),
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
}
}
testVector("Int APIs", 1024, IntegerType) {
(column, memMode) =>
column =>
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Int]
......@@ -329,16 +318,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Mem Mode=" + memMode)
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getInt(null, addr + 4 * v._2))
}
assert(v._1 == column.getInt(v._2),
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
}
}
testVector("Long APIs", 1024, LongType) {
(column, memMode) =>
column =>
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Long]
......@@ -413,16 +399,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getLong(v._2), "idx=" + v._2 +
" Seed = " + seed + " MemMode=" + memMode)
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getLong(null, addr + 8 * v._2))
}
" Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
}
}
testVector("Float APIs", 1024, FloatType) {
(column, memMode) =>
column =>
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Float]
......@@ -500,16 +482,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getFloat(v._2), "Seed = " + seed + " MemMode=" + memMode)
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getFloat(null, addr + 4 * v._2))
}
assert(v._1 == column.getFloat(v._2),
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
}
}
testVector("Double APIs", 1024, DoubleType) {
(column, memMode) =>
column =>
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Double]
......@@ -587,16 +566,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " MemMode=" + memMode)
if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getDouble(null, addr + 8 * v._2))
}
assert(v._1 == column.getDouble(v._2),
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
}
}
testVector("String APIs", 6, StringType) {
(column, memMode) =>
column =>
val reference = mutable.ArrayBuffer.empty[String]
assert(column.arrayData().elementsAppended == 0)
......@@ -643,9 +619,9 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(column.arrayData().elementsAppended == 17 + (s + s).length)
reference.zipWithIndex.foreach { v =>
assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode)
assert(v._1 == column.getUTF8String(v._2).toString,
"MemoryMode" + memMode)
val errMsg = "VectorType=" + column.getClass.getSimpleName
assert(v._1.length == column.getArrayLength(v._2), errMsg)
assert(v._1 == column.getUTF8String(v._2).toString, errMsg)
}
column.reset()
......@@ -653,7 +629,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
testVector("Int Array", 10, new ArrayType(IntegerType, true)) {
(column, _) =>
column =>
// Fill the underlying data with all the arrays back to back.
val data = column.arrayData()
......@@ -763,7 +739,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
testVector(
"Struct Column",
10,
new StructType().add("int", IntegerType).add("double", DoubleType)) { (column, _) =>
new StructType().add("int", IntegerType).add("double", DoubleType)) { column =>
val c1 = column.getChildColumn(0)
val c2 = column.getChildColumn(1)
assert(c1.dataType() == IntegerType)
......@@ -789,7 +765,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
testVector("Nest Array in Array", 10, new ArrayType(new ArrayType(IntegerType, true), true)) {
(column, _) =>
column =>
val childColumn = column.arrayData()
val data = column.arrayData().arrayData()
(0 until 6).foreach {
......@@ -822,7 +798,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
testVector(
"Nest Struct in Array",
10,
new ArrayType(structType, true)) { (column, _) =>
new ArrayType(structType, true)) { column =>
val data = column.arrayData()
val c0 = data.getChildColumn(0)
val c1 = data.getChildColumn(1)
......@@ -851,7 +827,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
10,
new StructType()
.add("int", IntegerType)
.add("array", new ArrayType(IntegerType, true))) { (column, _) =>
.add("array", new ArrayType(IntegerType, true))) { column =>
val c0 = column.getChildColumn(0)
val c1 = column.getChildColumn(1)
c0.putInt(0, 0)
......@@ -880,7 +856,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
testVector(
"Nest Struct in Struct",
10,
new StructType().add("int", IntegerType).add("struct", subSchema)) { (column, _) =>
new StructType().add("int", IntegerType).add("struct", subSchema)) { column =>
val c0 = column.getChildColumn(0)
val c1 = column.getChildColumn(1)
c0.putInt(0, 0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment