Skip to content
Snippets Groups Projects
Commit e15e5741 authored by Michael Armbrust's avatar Michael Armbrust Committed by Patrick Wendell
Browse files

[SQL] Add a custom serializer for maps since they do not have a no-arg constructor.

Author: Michael Armbrust <michael@databricks.com>

Closes #243 from marmbrus/mapSer and squashes the following commits:

54045f7 [Michael Armbrust] Add a custom serializer for maps since they do not have a no-arg constructor.
parent 32cbdfd2
No related branches found
No related tags found
No related merge requests found
...@@ -32,6 +32,7 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { ...@@ -32,6 +32,7 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
kryo.setRegistrationRequired(false) kryo.setRegistrationRequired(false)
kryo.register(classOf[MutablePair[_, _]]) kryo.register(classOf[MutablePair[_, _]])
kryo.register(classOf[Array[Any]]) kryo.register(classOf[Array[Any]])
kryo.register(classOf[scala.collection.immutable.Map$Map1], new MapSerializer)
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]]) kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
...@@ -70,3 +71,20 @@ class BigDecimalSerializer extends Serializer[BigDecimal] { ...@@ -70,3 +71,20 @@ class BigDecimalSerializer extends Serializer[BigDecimal] {
BigDecimal(input.readString()) BigDecimal(input.readString())
} }
} }
/**
* Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize
* them as `Array[(k,v)]`.
*/
class MapSerializer extends Serializer[Map[_,_]] {
def write(kryo: Kryo, output: Output, map: Map[_,_]) {
kryo.writeObject(output, map.flatMap(e => Seq(e._1, e._2)).toArray)
}
def read(kryo: Kryo, input: Input, tpe: Class[Map[_,_]]): Map[_,_] = {
kryo.readObject(input, classOf[Array[Any]])
.sliding(2,2)
.map { case Array(k,v) => (k,v) }
.toMap
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment