Skip to content
Snippets Groups Projects
Commit 09b3c56c authored by Sameer Agarwal's avatar Sameer Agarwal Committed by Reynold Xin
Browse files

[SPARK-14752][SQL] Explicitly implement KryoSerialization for LazilyGenerateOrdering

## What changes were proposed in this pull request?

This patch fixes a number of `com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException` exceptions reported in [SPARK-15604], [SPARK-14752] etc. (while executing sparkSQL queries with the kryo serializer) by explicitly implementing `KryoSerialization` for `LazilyGenerateOrdering`.

## How was this patch tested?

1. Modified `OrderingSuite` so that all tests in the suite also test kryo serialization (for both interpreted and generated ordering).
2. Manually verified TPC-DS q1.

Author: Sameer Agarwal <sameer@databricks.com>

Closes #13466 from sameeragarwal/kryo.
parent 7c07d176
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.expressions.codegen ...@@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.expressions.codegen
import java.io.ObjectInputStream import java.io.ObjectInputStream
import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
...@@ -147,7 +150,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR ...@@ -147,7 +150,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
/** /**
* A lazily generated row ordering comparator. * A lazily generated row ordering comparator.
*/ */
class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] { class LazilyGeneratedOrdering(val ordering: Seq[SortOrder])
extends Ordering[InternalRow] with KryoSerializable {
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(ordering.map(BindReferences.bindReference(_, inputSchema))) this(ordering.map(BindReferences.bindReference(_, inputSchema)))
...@@ -163,6 +167,14 @@ class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[Int ...@@ -163,6 +167,14 @@ class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[Int
in.defaultReadObject() in.defaultReadObject()
generatedOrdering = GenerateOrdering.generate(ordering) generatedOrdering = GenerateOrdering.generate(ordering)
} }
override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException {
kryo.writeObject(out, ordering.toArray)
}
override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
generatedOrdering = GenerateOrdering.generate(kryo.readObject(in, classOf[Array[SortOrder]]))
}
} }
object LazilyGeneratedOrdering { object LazilyGeneratedOrdering {
......
...@@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.expressions ...@@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.expressions
import scala.math._ import scala.math._
import org.apache.spark.SparkFunSuite import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateOrdering, LazilyGeneratedOrdering}
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
...@@ -44,9 +45,14 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { ...@@ -44,9 +45,14 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
case Ascending => signum(expected) case Ascending => signum(expected)
case Descending => -1 * signum(expected) case Descending => -1 * signum(expected)
} }
val kryo = new KryoSerializer(new SparkConf).newInstance()
val intOrdering = new InterpretedOrdering(sortOrder :: Nil) val intOrdering = new InterpretedOrdering(sortOrder :: Nil)
val genOrdering = GenerateOrdering.generate(sortOrder :: Nil) val genOrdering = new LazilyGeneratedOrdering(sortOrder :: Nil)
Seq(intOrdering, genOrdering).foreach { ordering => val kryoIntOrdering = kryo.deserialize[InterpretedOrdering](kryo.serialize(intOrdering))
val kryoGenOrdering = kryo.deserialize[LazilyGeneratedOrdering](kryo.serialize(genOrdering))
Seq(intOrdering, genOrdering, kryoIntOrdering, kryoGenOrdering).foreach { ordering =>
assert(ordering.compare(rowA, rowA) === 0) assert(ordering.compare(rowA, rowA) === 0)
assert(ordering.compare(rowB, rowB) === 0) assert(ordering.compare(rowB, rowB) === 0)
assert(signum(ordering.compare(rowA, rowB)) === expectedCompareResult) assert(signum(ordering.compare(rowA, rowB)) === expectedCompareResult)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment