diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index c10829d4f14f365d2c3c8b0e2b400c4b379216e9..f4d35d232e691f1fa1467248ee73f9ae02efaad7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.expressions.codegen import java.io.ObjectInputStream +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -147,7 +150,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR /** * A lazily generated row ordering comparator. */ -class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] { +class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) + extends Ordering[InternalRow] with KryoSerializable { def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = this(ordering.map(BindReferences.bindReference(_, inputSchema))) @@ -163,6 +167,14 @@ class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[Int in.defaultReadObject() generatedOrdering = GenerateOrdering.generate(ordering) } + + override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException { + kryo.writeObject(out, ordering.toArray) + } + + override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException { + generatedOrdering = GenerateOrdering.generate(kryo.readObject(in, classOf[Array[SortOrder]])) + } } object LazilyGeneratedOrdering { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala index b190d3a00dfb890458bf7982b2a39e83c3e36320..8cc2ab46c0c857430f3a73af640d874a5c26632b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.expressions import scala.math._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateOrdering, LazilyGeneratedOrdering} import org.apache.spark.sql.types._ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -44,9 +45,14 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper { case Ascending => signum(expected) case Descending => -1 * signum(expected) } + + val kryo = new KryoSerializer(new SparkConf).newInstance() val intOrdering = new InterpretedOrdering(sortOrder :: Nil) - val genOrdering = GenerateOrdering.generate(sortOrder :: Nil) - Seq(intOrdering, genOrdering).foreach { ordering => + val genOrdering = new LazilyGeneratedOrdering(sortOrder :: Nil) + val kryoIntOrdering = kryo.deserialize[InterpretedOrdering](kryo.serialize(intOrdering)) + val kryoGenOrdering = kryo.deserialize[LazilyGeneratedOrdering](kryo.serialize(genOrdering)) + + Seq(intOrdering, genOrdering, kryoIntOrdering, kryoGenOrdering).foreach { ordering => assert(ordering.compare(rowA, rowA) === 0) assert(ordering.compare(rowB, rowB) === 0) assert(signum(ordering.compare(rowA, rowB)) === expectedCompareResult)