Skip to content
Snippets Groups Projects
Commit 57a4379c authored by Cheng Lian's avatar Cheng Lian Committed by Patrick Wendell
Browse files

[SPARK-1292] In-memory columnar representation for Spark SQL

This PR is rebased from the Catalyst repository, and contains the first version of in-memory columnar representation for Spark SQL. Compression support is not included yet and will be added later in a separate PR.

Author: Cheng Lian <lian@databricks.com>
Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #205 from liancheng/memColumnarSupport and squashes the following commits:

99dba41 [Cheng Lian] Restricted new objects/classes to `private[sql]'
0892ad8 [Cheng Lian] Addressed ScalaStyle issues
af1ad5e [Cheng Lian] Fixed some minor issues introduced during rebasing
0dbf2fb [Cheng Lian] Make necessary renaming due to rebase
a162d4d [Cheng Lian] Removed the unnecessary InMemoryColumnarRelation class
9bcae4b [Cheng Lian] Added Apache license
220ee1e [Cheng Lian] Added table scan operator for in-memory columnar support.
c701c7a [Cheng Lian] Using SparkSqlSerializer for generic object SerDe causes error, made a workaround
ed8608e [Cheng Lian] Added implicit conversion from DataType to ColumnType
b8a645a [Cheng Lian] Replaced KryoSerializer with an updated SparkSqlSerializer
b6c0a49 [Cheng Lian] Minor test suite refactoring
214be73 [Cheng Lian] Refactored BINARY and GENERIC to reduce duplicate code
da2f4d5 [Cheng Lian] Added Apache license
dbf7a38 [Cheng Lian] Added ColumnAccessor and test suite, refactored ColumnBuilder
c01a177 [Cheng Lian] Added column builder classes and test suite
f18ddc6 [Cheng Lian] Added ColumnTypes and test suite
2d09066 [Cheng Lian] Added KryoSerializer
34f3c19 [Cheng Lian] Added TypeTag field to all NativeTypes
acc5c48 [Cheng Lian] Added Hive test files to .gitignore
parent abf6714e
No related branches found
No related tags found
No related merge requests found
Showing
with 1315 additions and 35 deletions
......@@ -19,7 +19,9 @@ package org.apache.spark.sql
package catalyst
package types
import expressions.Expression
import scala.reflect.runtime.universe.{typeTag, TypeTag}
import org.apache.spark.sql.catalyst.expressions.Expression
abstract class DataType {
/** Matches any expression that evaluates to this DataType */
......@@ -33,11 +35,13 @@ case object NullType extends DataType
abstract class NativeType extends DataType {
type JvmType
@transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]
}
case object StringType extends NativeType {
type JvmType = String
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
}
case object BinaryType extends DataType {
......@@ -45,6 +49,7 @@ case object BinaryType extends DataType {
}
case object BooleanType extends NativeType {
type JvmType = Boolean
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
}
......@@ -71,6 +76,7 @@ abstract class IntegralType extends NumericType {
case object LongType extends IntegralType {
type JvmType = Long
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Long]]
val integral = implicitly[Integral[Long]]
val ordering = implicitly[Ordering[JvmType]]
......@@ -78,6 +84,7 @@ case object LongType extends IntegralType {
case object IntegerType extends IntegralType {
type JvmType = Int
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Int]]
val integral = implicitly[Integral[Int]]
val ordering = implicitly[Ordering[JvmType]]
......@@ -85,6 +92,7 @@ case object IntegerType extends IntegralType {
case object ShortType extends IntegralType {
type JvmType = Short
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Short]]
val integral = implicitly[Integral[Short]]
val ordering = implicitly[Ordering[JvmType]]
......@@ -92,6 +100,7 @@ case object ShortType extends IntegralType {
case object ByteType extends IntegralType {
type JvmType = Byte
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Byte]]
val integral = implicitly[Integral[Byte]]
val ordering = implicitly[Ordering[JvmType]]
......@@ -110,6 +119,7 @@ abstract class FractionalType extends NumericType {
case object DecimalType extends FractionalType {
type JvmType = BigDecimal
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[BigDecimal]]
val fractional = implicitly[Fractional[BigDecimal]]
val ordering = implicitly[Ordering[JvmType]]
......@@ -117,6 +127,7 @@ case object DecimalType extends FractionalType {
case object DoubleType extends FractionalType {
type JvmType = Double
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Double]]
val fractional = implicitly[Fractional[Double]]
val ordering = implicitly[Ordering[JvmType]]
......@@ -124,6 +135,7 @@ case object DoubleType extends FractionalType {
case object FloatType extends FractionalType {
type JvmType = Float
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Float]]
val fractional = implicitly[Fractional[Float]]
val ordering = implicitly[Ordering[JvmType]]
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package columnar
import java.nio.{ByteOrder, ByteBuffer}
import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType}
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.execution.SparkSqlSerializer
/**
* An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
* extracted from the buffer, instead of directly returning it, the value is set into some field of
* a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
* for primitive values provided by [[MutableRow]].
*/
private[sql] trait ColumnAccessor {
initialize()
protected def initialize()
def hasNext: Boolean
def extractTo(row: MutableRow, ordinal: Int)
protected def underlyingBuffer: ByteBuffer
}
private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer)
extends ColumnAccessor {
protected def initialize() {}
def columnType: ColumnType[T, JvmType]
def hasNext = buffer.hasRemaining
def extractTo(row: MutableRow, ordinal: Int) {
doExtractTo(row, ordinal)
}
protected def doExtractTo(row: MutableRow, ordinal: Int)
protected def underlyingBuffer = buffer
}
private[sql] abstract class NativeColumnAccessor[T <: NativeType](
buffer: ByteBuffer,
val columnType: NativeColumnType[T])
extends BasicColumnAccessor[T, T#JvmType](buffer)
with NullableColumnAccessor
private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BOOLEAN) {
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setBoolean(ordinal, columnType.extract(buffer))
}
}
private[sql] class IntColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, INT) {
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setInt(ordinal, columnType.extract(buffer))
}
}
private[sql] class ShortColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, SHORT) {
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setShort(ordinal, columnType.extract(buffer))
}
}
private[sql] class LongColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, LONG) {
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setLong(ordinal, columnType.extract(buffer))
}
}
private[sql] class ByteColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, BYTE) {
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setByte(ordinal, columnType.extract(buffer))
}
}
private[sql] class DoubleColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, DOUBLE) {
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setDouble(ordinal, columnType.extract(buffer))
}
}
private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, FLOAT) {
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setFloat(ordinal, columnType.extract(buffer))
}
}
private[sql] class StringColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, STRING) {
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row.setString(ordinal, columnType.extract(buffer))
}
}
private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer)
with NullableColumnAccessor {
def columnType = BINARY
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
row(ordinal) = columnType.extract(buffer)
}
}
private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[DataType, Array[Byte]](buffer)
with NullableColumnAccessor {
def columnType = GENERIC
override protected def doExtractTo(row: MutableRow, ordinal: Int) {
val serialized = columnType.extract(buffer)
row(ordinal) = SparkSqlSerializer.deserialize[Any](serialized)
}
}
private[sql] object ColumnAccessor {
def apply(b: ByteBuffer): ColumnAccessor = {
// The first 4 bytes in the buffer indicates the column type.
val buffer = b.duplicate().order(ByteOrder.nativeOrder())
val columnTypeId = buffer.getInt()
columnTypeId match {
case INT.typeId => new IntColumnAccessor(buffer)
case LONG.typeId => new LongColumnAccessor(buffer)
case FLOAT.typeId => new FloatColumnAccessor(buffer)
case DOUBLE.typeId => new DoubleColumnAccessor(buffer)
case BOOLEAN.typeId => new BooleanColumnAccessor(buffer)
case BYTE.typeId => new ByteColumnAccessor(buffer)
case SHORT.typeId => new ShortColumnAccessor(buffer)
case STRING.typeId => new StringColumnAccessor(buffer)
case BINARY.typeId => new BinaryColumnAccessor(buffer)
case GENERIC.typeId => new GenericColumnAccessor(buffer)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package columnar
import java.nio.{ByteOrder, ByteBuffer}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.SparkSqlSerializer
private[sql] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
def initialize(initialSize: Int, columnName: String = "")
def appendFrom(row: Row, ordinal: Int)
def build(): ByteBuffer
}
private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder {
import ColumnBuilder._
private var columnName: String = _
protected var buffer: ByteBuffer = _
def columnType: ColumnType[T, JvmType]
override def initialize(initialSize: Int, columnName: String = "") = {
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
this.columnName = columnName
buffer = ByteBuffer.allocate(4 + 4 + size * columnType.defaultSize)
buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
}
// Have to give a concrete implementation to make mixin possible
override def appendFrom(row: Row, ordinal: Int) {
doAppendFrom(row, ordinal)
}
// Concrete `ColumnBuilder`s can override this method to append values
protected def doAppendFrom(row: Row, ordinal: Int)
// Helper method to append primitive values (to avoid boxing cost)
protected def appendValue(v: JvmType) {
buffer = ensureFreeSpace(buffer, columnType.actualSize(v))
columnType.append(v, buffer)
}
override def build() = {
buffer.limit(buffer.position()).rewind()
buffer
}
}
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
val columnType: NativeColumnType[T])
extends BasicColumnBuilder[T, T#JvmType]
with NullableColumnBuilder
private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getBoolean(ordinal))
}
}
private[sql] class IntColumnBuilder extends NativeColumnBuilder(INT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getInt(ordinal))
}
}
private[sql] class ShortColumnBuilder extends NativeColumnBuilder(SHORT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getShort(ordinal))
}
}
private[sql] class LongColumnBuilder extends NativeColumnBuilder(LONG) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getLong(ordinal))
}
}
private[sql] class ByteColumnBuilder extends NativeColumnBuilder(BYTE) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getByte(ordinal))
}
}
private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getDouble(ordinal))
}
}
private[sql] class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getFloat(ordinal))
}
}
private[sql] class StringColumnBuilder extends NativeColumnBuilder(STRING) {
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row.getString(ordinal))
}
}
private[sql] class BinaryColumnBuilder
extends BasicColumnBuilder[BinaryType.type, Array[Byte]]
with NullableColumnBuilder {
def columnType = BINARY
override def doAppendFrom(row: Row, ordinal: Int) {
appendValue(row(ordinal).asInstanceOf[Array[Byte]])
}
}
// TODO (lian) Add support for array, struct and map
private[sql] class GenericColumnBuilder
extends BasicColumnBuilder[DataType, Array[Byte]]
with NullableColumnBuilder {
def columnType = GENERIC
override def doAppendFrom(row: Row, ordinal: Int) {
val serialized = SparkSqlSerializer.serialize(row(ordinal))
buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized))
columnType.append(serialized, buffer)
}
}
private[sql] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
if (orig.remaining >= size) {
orig
} else {
// grow in steps of initial size
val capacity = orig.capacity()
val newSize = capacity + size.max(capacity / 8 + 1)
val pos = orig.position()
orig.clear()
ByteBuffer
.allocate(newSize)
.order(ByteOrder.nativeOrder())
.put(orig.array(), 0, pos)
}
}
def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {
val builder = (typeId match {
case INT.typeId => new IntColumnBuilder
case LONG.typeId => new LongColumnBuilder
case FLOAT.typeId => new FloatColumnBuilder
case DOUBLE.typeId => new DoubleColumnBuilder
case BOOLEAN.typeId => new BooleanColumnBuilder
case BYTE.typeId => new ByteColumnBuilder
case SHORT.typeId => new ShortColumnBuilder
case STRING.typeId => new StringColumnBuilder
case BINARY.typeId => new BinaryColumnBuilder
case GENERIC.typeId => new GenericColumnBuilder
}).asInstanceOf[ColumnBuilder]
builder.initialize(initialSize, columnName)
builder
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package columnar
import java.nio.ByteBuffer
import org.apache.spark.sql.catalyst.types._
/**
* An abstract class that represents type of a column. Used to append/extract Java objects into/from
* the underlying [[ByteBuffer]] of a column.
*
* @param typeId A unique ID representing the type.
* @param defaultSize Default size in bytes for one element of type T (e.g. 4 for `Int`).
* @tparam T Scala data type for the column.
* @tparam JvmType Underlying Java type to represent the elements.
*/
private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
val typeId: Int,
val defaultSize: Int) {
/**
* Extracts a value out of the buffer at the buffer's current position.
*/
def extract(buffer: ByteBuffer): JvmType
/**
* Appends the given value v of type T into the given ByteBuffer.
*/
def append(v: JvmType, buffer: ByteBuffer)
/**
* Returns the size of the value. This is used to calculate the size of variable length types
* such as byte arrays and strings.
*/
def actualSize(v: JvmType): Int = defaultSize
/**
* Creates a duplicated copy of the value.
*/
def clone(v: JvmType): JvmType = v
}
private[sql] abstract class NativeColumnType[T <: NativeType](
val dataType: T,
typeId: Int,
defaultSize: Int)
extends ColumnType[T, T#JvmType](typeId, defaultSize) {
/**
* Scala TypeTag. Can be used to create primitive arrays and hash tables.
*/
def scalaTag = dataType.tag
}
private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
def append(v: Int, buffer: ByteBuffer) {
buffer.putInt(v)
}
def extract(buffer: ByteBuffer) = {
buffer.getInt()
}
}
private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
override def append(v: Long, buffer: ByteBuffer) {
buffer.putLong(v)
}
override def extract(buffer: ByteBuffer) = {
buffer.getLong()
}
}
private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
override def append(v: Float, buffer: ByteBuffer) {
buffer.putFloat(v)
}
override def extract(buffer: ByteBuffer) = {
buffer.getFloat()
}
}
private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
override def append(v: Double, buffer: ByteBuffer) {
buffer.putDouble(v)
}
override def extract(buffer: ByteBuffer) = {
buffer.getDouble()
}
}
private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
override def append(v: Boolean, buffer: ByteBuffer) {
buffer.put(if (v) 1.toByte else 0.toByte)
}
override def extract(buffer: ByteBuffer) = {
if (buffer.get() == 1) true else false
}
}
private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
override def append(v: Byte, buffer: ByteBuffer) {
buffer.put(v)
}
override def extract(buffer: ByteBuffer) = {
buffer.get()
}
}
private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
override def append(v: Short, buffer: ByteBuffer) {
buffer.putShort(v)
}
override def extract(buffer: ByteBuffer) = {
buffer.getShort()
}
}
private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
override def actualSize(v: String): Int = v.getBytes.length + 4
override def append(v: String, buffer: ByteBuffer) {
val stringBytes = v.getBytes()
buffer.putInt(stringBytes.length).put(stringBytes, 0, stringBytes.length)
}
override def extract(buffer: ByteBuffer) = {
val length = buffer.getInt()
val stringBytes = new Array[Byte](length)
buffer.get(stringBytes, 0, length)
new String(stringBytes)
}
}
private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
typeId: Int,
defaultSize: Int)
extends ColumnType[T, Array[Byte]](typeId, defaultSize) {
override def actualSize(v: Array[Byte]) = v.length + 4
override def append(v: Array[Byte], buffer: ByteBuffer) {
buffer.putInt(v.length).put(v, 0, v.length)
}
override def extract(buffer: ByteBuffer) = {
val length = buffer.getInt()
val bytes = new Array[Byte](length)
buffer.get(bytes, 0, length)
bytes
}
}
private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16)
// Used to process generic objects (all types other than those listed above). Objects should be
// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized
// byte array.
private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16)
private[sql] object ColumnType {
implicit def dataTypeToColumnType(dataType: DataType): ColumnType[_, _] = {
dataType match {
case IntegerType => INT
case LongType => LONG
case FloatType => FLOAT
case DoubleType => DOUBLE
case BooleanType => BOOLEAN
case ByteType => BYTE
case ShortType => SHORT
case StringType => STRING
case BinaryType => BINARY
case _ => GENERIC
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.columnar
import java.nio.{ByteOrder, ByteBuffer}
import org.apache.spark.sql.catalyst.expressions.MutableRow
private[sql] trait NullableColumnAccessor extends ColumnAccessor {
private var nullsBuffer: ByteBuffer = _
private var nullCount: Int = _
private var seenNulls: Int = 0
private var nextNullIndex: Int = _
private var pos: Int = 0
abstract override def initialize() {
nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder())
nullCount = nullsBuffer.getInt()
nextNullIndex = if (nullCount > 0) nullsBuffer.getInt() else -1
pos = 0
underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4)
super.initialize()
}
abstract override def extractTo(row: MutableRow, ordinal: Int) {
if (pos == nextNullIndex) {
seenNulls += 1
if (seenNulls < nullCount) {
nextNullIndex = nullsBuffer.getInt()
}
row.setNullAt(ordinal)
} else {
super.extractTo(row, ordinal)
}
pos += 1
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package columnar
import java.nio.{ByteOrder, ByteBuffer}
/**
* Builds a nullable column. The byte buffer of a nullable column contains:
* - 4 bytes for the null count (number of nulls)
* - positions for each null, in ascending order
* - the non-null data (column data type, compression type, data...)
*/
private[sql] trait NullableColumnBuilder extends ColumnBuilder {
private var nulls: ByteBuffer = _
private var pos: Int = _
private var nullCount: Int = _
abstract override def initialize(initialSize: Int, columnName: String) {
nulls = ByteBuffer.allocate(1024)
nulls.order(ByteOrder.nativeOrder())
pos = 0
nullCount = 0
super.initialize(initialSize, columnName)
}
abstract override def appendFrom(row: Row, ordinal: Int) {
if (row.isNullAt(ordinal)) {
nulls = ColumnBuilder.ensureFreeSpace(nulls, 4)
nulls.putInt(pos)
nullCount += 1
} else {
super.appendFrom(row, ordinal)
}
pos += 1
}
abstract override def build(): ByteBuffer = {
val nonNulls = super.build()
val typeId = nonNulls.getInt()
val nullDataLen = nulls.position()
nulls.limit(nullDataLen)
nulls.rewind()
// Column type ID is moved to the front, follows the null count, then non-null data
//
// +---------+
// | 4 bytes | Column type ID
// +---------+
// | 4 bytes | Null count
// +---------+
// | ... | Null positions (if null count is not zero)
// +---------+
// | ... | Non-null part (without column type ID)
// +---------+
val buffer = ByteBuffer
.allocate(4 + nullDataLen + nonNulls.limit)
.order(ByteOrder.nativeOrder())
.putInt(typeId)
.putInt(nullCount)
.put(nulls)
.put(nonNulls)
buffer.rewind()
buffer
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package columnar
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
extends LeafNode {
// For implicit conversion from `DataType` to `ColumnType`
import ColumnType._
override def output: Seq[Attribute] = attributes
lazy val cachedColumnBuffers = {
val output = child.output
val cached = child.execute().mapPartitions { iterator =>
val columnBuilders = output.map { a =>
ColumnBuilder(a.dataType.typeId, 0, a.name)
}.toArray
var row: Row = null
while (iterator.hasNext) {
row = iterator.next()
var i = 0
while (i < row.length) {
columnBuilders(i).appendFrom(row, i)
i += 1
}
}
Iterator.single(columnBuilders.map(_.build()))
}.cache()
cached.setName(child.toString)
// Force the materialization of the cached RDD.
cached.count()
cached
}
override def execute() = {
cachedColumnBuffers.mapPartitions { iterator =>
val columnBuffers = iterator.next()
assert(!iterator.hasNext)
new Iterator[Row] {
val columnAccessors = columnBuffers.map(ColumnAccessor(_))
val nextRow = new GenericMutableRow(columnAccessors.length)
override def next() = {
var i = 0
while (i < nextRow.length) {
columnAccessors(i).extractTo(nextRow, i)
i += 1
}
nextRow
}
override def hasNext = columnAccessors.head.hasNext
}
}
}
}
......@@ -18,14 +18,8 @@
package org.apache.spark.sql
package execution
import java.nio.ByteBuffer
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Output, Input}
import org.apache.spark.{SparkConf, RangePartitioner, HashPartitioner}
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.MutablePair
import catalyst.rules.Rule
......@@ -33,33 +27,6 @@ import catalyst.errors._
import catalyst.expressions._
import catalyst.plans.physical._
private class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
val kryo = new Kryo
kryo.setRegistrationRequired(true)
kryo.register(classOf[MutablePair[_,_]])
kryo.register(classOf[Array[Any]])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
kryo.setReferences(false)
kryo.setClassLoader(this.getClass.getClassLoader)
kryo
}
}
private class BigDecimalSerializer extends Serializer[BigDecimal] {
def write(kryo: Kryo, output: Output, bd: math.BigDecimal) {
// TODO: There are probably more efficient representations than strings...
output.writeString(bd.toString)
}
def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
BigDecimal(input.readString())
}
}
case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {
override def outputPartitioning = newPartitioning
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package execution
import java.nio.ByteBuffer
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Serializer, Kryo}
import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.MutablePair
class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
val kryo = new Kryo()
kryo.setRegistrationRequired(false)
kryo.register(classOf[MutablePair[_, _]])
kryo.register(classOf[Array[Any]])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
kryo.setReferences(false)
kryo.setClassLoader(this.getClass.getClassLoader)
kryo
}
}
object SparkSqlSerializer {
// TODO (lian) Using KryoSerializer here is workaround, needs further investigation
// Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
// related error.
@transient lazy val ser: KryoSerializer = {
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
new KryoSerializer(sparkConf)
}
def serialize[T](o: T): Array[Byte] = {
ser.newInstance().serialize(o).array()
}
def deserialize[T](bytes: Array[Byte]): T = {
ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes))
}
}
class BigDecimalSerializer extends Serializer[BigDecimal] {
def write(kryo: Kryo, output: Output, bd: math.BigDecimal) {
// TODO: There are probably more efficient representations than strings...
output.writeString(bd.toString())
}
def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
BigDecimal(input.readString())
}
}
......@@ -33,7 +33,7 @@ import TestSQLContext._
class QueryTest extends FunSuite {
/**
* Runs the plan and makes sure the answer matches the expected result.
* @param plan the query to be executed
* @param rdd the [[SchemaRDD]] to be executed
* @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
*/
protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = {
......
package org.apache.spark.sql
package columnar
import java.nio.ByteBuffer
import scala.util.Random
import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.SparkSqlSerializer
class ColumnTypeSuite extends FunSuite {
val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING, BINARY, GENERIC)
test("defaultSize") {
val defaultSize = Seq(4, 2, 8, 1, 8, 4, 8, 16, 16)
columnTypes.zip(defaultSize).foreach { case (columnType, size) =>
assert(columnType.defaultSize === size)
}
}
test("actualSize") {
val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 11)
val actualSizes = Seq(
INT.actualSize(Int.MaxValue),
SHORT.actualSize(Short.MaxValue),
LONG.actualSize(Long.MaxValue),
BYTE.actualSize(Byte.MaxValue),
DOUBLE.actualSize(Double.MaxValue),
FLOAT.actualSize(Float.MaxValue),
STRING.actualSize("hello"),
BINARY.actualSize(new Array[Byte](4)),
GENERIC.actualSize(SparkSqlSerializer.serialize(Map(1 -> "a"))))
expectedSizes.zip(actualSizes).foreach { case (expected, actual) =>
assert(expected === actual)
}
}
testNumericColumnType[BooleanType.type, Boolean](
BOOLEAN,
Array.fill(4)(Random.nextBoolean()),
ByteBuffer.allocate(32),
(buffer: ByteBuffer, v: Boolean) => {
buffer.put((if (v) 1 else 0).toByte)
},
(buffer: ByteBuffer) => {
buffer.get() == 1
})
testNumericColumnType[IntegerType.type, Int](
INT,
Array.fill(4)(Random.nextInt()),
ByteBuffer.allocate(32),
(_: ByteBuffer).putInt(_),
(_: ByteBuffer).getInt)
testNumericColumnType[ShortType.type, Short](
SHORT,
Array.fill(4)(Random.nextInt(Short.MaxValue).asInstanceOf[Short]),
ByteBuffer.allocate(32),
(_: ByteBuffer).putShort(_),
(_: ByteBuffer).getShort)
testNumericColumnType[LongType.type, Long](
LONG,
Array.fill(4)(Random.nextLong()),
ByteBuffer.allocate(64),
(_: ByteBuffer).putLong(_),
(_: ByteBuffer).getLong)
testNumericColumnType[ByteType.type, Byte](
BYTE,
Array.fill(4)(Random.nextInt(Byte.MaxValue).asInstanceOf[Byte]),
ByteBuffer.allocate(64),
(_: ByteBuffer).put(_),
(_: ByteBuffer).get)
testNumericColumnType[DoubleType.type, Double](
DOUBLE,
Array.fill(4)(Random.nextDouble()),
ByteBuffer.allocate(64),
(_: ByteBuffer).putDouble(_),
(_: ByteBuffer).getDouble)
testNumericColumnType[FloatType.type, Float](
FLOAT,
Array.fill(4)(Random.nextFloat()),
ByteBuffer.allocate(64),
(_: ByteBuffer).putFloat(_),
(_: ByteBuffer).getFloat)
test("STRING") {
val buffer = ByteBuffer.allocate(128)
val seq = Array("hello", "world", "spark", "sql")
seq.map(_.getBytes).foreach { bytes: Array[Byte] =>
buffer.putInt(bytes.length).put(bytes)
}
buffer.rewind()
seq.foreach { s =>
assert(s === STRING.extract(buffer))
}
buffer.rewind()
seq.foreach(STRING.append(_, buffer))
buffer.rewind()
seq.foreach { s =>
val length = buffer.getInt
assert(length === s.getBytes.length)
val bytes = new Array[Byte](length)
buffer.get(bytes, 0, length)
assert(s === new String(bytes))
}
}
test("BINARY") {
val buffer = ByteBuffer.allocate(128)
val seq = Array.fill(4) {
val bytes = new Array[Byte](4)
Random.nextBytes(bytes)
bytes
}
seq.foreach { bytes =>
buffer.putInt(bytes.length).put(bytes)
}
buffer.rewind()
seq.foreach { b =>
assert(b === BINARY.extract(buffer))
}
buffer.rewind()
seq.foreach(BINARY.append(_, buffer))
buffer.rewind()
seq.foreach { b =>
val length = buffer.getInt
assert(length === b.length)
val bytes = new Array[Byte](length)
buffer.get(bytes, 0, length)
assert(b === bytes)
}
}
test("GENERIC") {
val buffer = ByteBuffer.allocate(512)
val obj = Map(1 -> "spark", 2 -> "sql")
val serializedObj = SparkSqlSerializer.serialize(obj)
GENERIC.append(SparkSqlSerializer.serialize(obj), buffer)
buffer.rewind()
val length = buffer.getInt()
assert(length === serializedObj.length)
val bytes = new Array[Byte](length)
buffer.get(bytes, 0, length)
assert(obj === SparkSqlSerializer.deserialize(bytes))
buffer.rewind()
buffer.putInt(serializedObj.length).put(serializedObj)
buffer.rewind()
assert(obj === SparkSqlSerializer.deserialize(GENERIC.extract(buffer)))
}
def testNumericColumnType[T <: DataType, JvmType](
columnType: ColumnType[T, JvmType],
seq: Seq[JvmType],
buffer: ByteBuffer,
putter: (ByteBuffer, JvmType) => Unit,
getter: (ByteBuffer) => JvmType) {
val columnTypeName = columnType.getClass.getSimpleName.stripSuffix("$")
test(s"$columnTypeName.extract") {
buffer.rewind()
seq.foreach(putter(buffer, _))
buffer.rewind()
seq.foreach { i =>
assert(i === columnType.extract(buffer))
}
}
test(s"$columnTypeName.append") {
buffer.rewind()
seq.foreach(columnType.append(_, buffer))
buffer.rewind()
seq.foreach { i =>
assert(i === getter(buffer))
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.columnar
import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{TestData, DslQuerySuite}
class ColumnarQuerySuite extends DslQuerySuite {
import TestData._
import TestSQLContext._
test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
checkAnswer(scan, testData.collect().toSeq)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.columnar
import scala.util.Random
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
// TODO Enrich test data
object ColumnarTestData {
object GenericMutableRow {
def apply(values: Any*) = {
val row = new GenericMutableRow(values.length)
row.indices.foreach { i =>
row(i) = values(i)
}
row
}
}
def randomBytes(length: Int) = {
val bytes = new Array[Byte](length)
Random.nextBytes(bytes)
bytes
}
val nonNullRandomRow = GenericMutableRow(
Random.nextInt(),
Random.nextLong(),
Random.nextFloat(),
Random.nextDouble(),
Random.nextBoolean(),
Random.nextInt(Byte.MaxValue).asInstanceOf[Byte],
Random.nextInt(Short.MaxValue).asInstanceOf[Short],
Random.nextString(Random.nextInt(64)),
randomBytes(Random.nextInt(64)),
Map(Random.nextInt() -> Random.nextString(4)))
val nullRow = GenericMutableRow(Seq.fill(10)(null): _*)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package columnar
import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.types.DataType
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
class NullableColumnAccessorSuite extends FunSuite {
import ColumnarTestData._
Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach {
testNullableColumnAccessor(_)
}
def testNullableColumnAccessor[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) {
val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
test(s"$typeName accessor: empty column") {
val builder = ColumnBuilder(columnType.typeId, 4)
val accessor = ColumnAccessor(builder.build())
assert(!accessor.hasNext)
}
test(s"$typeName accessor: access null values") {
val builder = ColumnBuilder(columnType.typeId, 4)
(0 until 4).foreach { _ =>
builder.appendFrom(nonNullRandomRow, columnType.typeId)
builder.appendFrom(nullRow, columnType.typeId)
}
val accessor = ColumnAccessor(builder.build())
val row = new GenericMutableRow(1)
(0 until 4).foreach { _ =>
accessor.extractTo(row, 0)
assert(row(0) === nonNullRandomRow(columnType.typeId))
accessor.extractTo(row, 0)
assert(row(0) === null)
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
package columnar
import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.types.DataType
import org.apache.spark.sql.execution.SparkSqlSerializer
class NullableColumnBuilderSuite extends FunSuite {
import ColumnarTestData._
Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach {
testNullableColumnBuilder(_)
}
def testNullableColumnBuilder[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) {
val columnBuilder = ColumnBuilder(columnType.typeId)
val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
test(s"$typeName column builder: empty column") {
columnBuilder.initialize(4)
val buffer = columnBuilder.build()
// For column type ID
assert(buffer.getInt() === columnType.typeId)
// For null count
assert(buffer.getInt === 0)
assert(!buffer.hasRemaining)
}
test(s"$typeName column builder: buffer size auto growth") {
columnBuilder.initialize(4)
(0 until 4) foreach { _ =>
columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId)
}
val buffer = columnBuilder.build()
// For column type ID
assert(buffer.getInt() === columnType.typeId)
// For null count
assert(buffer.getInt() === 0)
}
test(s"$typeName column builder: null values") {
columnBuilder.initialize(4)
(0 until 4) foreach { _ =>
columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId)
columnBuilder.appendFrom(nullRow, columnType.typeId)
}
val buffer = columnBuilder.build()
// For column type ID
assert(buffer.getInt() === columnType.typeId)
// For null count
assert(buffer.getInt() === 4)
// For null positions
(1 to 7 by 2).foreach(i => assert(buffer.getInt() === i))
// For non-null values
(0 until 4).foreach { _ =>
val actual = if (columnType == GENERIC) {
SparkSqlSerializer.deserialize[Any](GENERIC.extract(buffer))
} else {
columnType.extract(buffer)
}
assert(actual === nonNullRandomRow(columnType.typeId))
}
assert(!buffer.hasRemaining)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment