Skip to content
Snippets Groups Projects
Commit efdaeb11 authored by Ian O Connell's avatar Ian O Connell Committed by Michael Armbrust
Browse files

[SPARK-2102][SQL][CORE] Add option for kryo registration required and use a...

[SPARK-2102][SQL][CORE] Add option for kryo registration required and use a resource pool in Spark SQL for Kryo instances.

Author: Ian O Connell <ioconnell@twitter.com>

Closes #1377 from ianoc/feature/SPARK-2102 and squashes the following commits:

5498566 [Ian O Connell] Docs update suggested by Patrick
20e8555 [Ian O Connell] Slight style change
f92c294 [Ian O Connell] Add docs for new KryoSerializer option
f3735c8 [Ian O Connell] Add using a kryo resource pool for the SqlSerializer
4e5c342 [Ian O Connell] Register the SparkConf for kryo, it gets swept into serialization
665805a [Ian O Connell] Add a spark.kryo.registrationRequired option for configuring the Kryo Serializer
parent 1871574a
No related branches found
No related tags found
No related merge requests found
...@@ -48,6 +48,7 @@ class KryoSerializer(conf: SparkConf) ...@@ -48,6 +48,7 @@ class KryoSerializer(conf: SparkConf)
private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val registrator = conf.getOption("spark.kryo.registrator") private val registrator = conf.getOption("spark.kryo.registrator")
def newKryoOutput() = new KryoOutput(bufferSize) def newKryoOutput() = new KryoOutput(bufferSize)
...@@ -55,6 +56,7 @@ class KryoSerializer(conf: SparkConf) ...@@ -55,6 +56,7 @@ class KryoSerializer(conf: SparkConf)
def newKryo(): Kryo = { def newKryo(): Kryo = {
val instantiator = new EmptyScalaKryoInstantiator val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo() val kryo = instantiator.newKryo()
kryo.setRegistrationRequired(registrationRequired)
val classLoader = Thread.currentThread.getContextClassLoader val classLoader = Thread.currentThread.getContextClassLoader
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
...@@ -185,7 +187,8 @@ private[serializer] object KryoSerializer { ...@@ -185,7 +187,8 @@ private[serializer] object KryoSerializer {
classOf[MapStatus], classOf[MapStatus],
classOf[BlockManagerId], classOf[BlockManagerId],
classOf[Array[Byte]], classOf[Array[Byte]],
classOf[BoundedPriorityQueue[_]] classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
) )
} }
......
...@@ -388,6 +388,17 @@ Apart from these, the following properties are also available, and may be useful ...@@ -388,6 +388,17 @@ Apart from these, the following properties are also available, and may be useful
case. case.
</td> </td>
</tr> </tr>
<tr>
<td><code>spark.kryo.registrationRequired</code></td>
<td>false</td>
<td>
Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception
if an unregistered class is serialized. If set to false (the default), Kryo will write
unregistered class names along with each object. Writing class names can cause
significant performance overhead, so enabling this option can enforce strictly that a
user has not omitted classes from registration.
</td>
</tr>
<tr> <tr>
<td><code>spark.kryoserializer.buffer.mb</code></td> <td><code>spark.kryoserializer.buffer.mb</code></td>
<td>2</td> <td>2</td>
...@@ -497,9 +508,9 @@ Apart from these, the following properties are also available, and may be useful ...@@ -497,9 +508,9 @@ Apart from these, the following properties are also available, and may be useful
<tr> <tr>
<td>spark.hadoop.validateOutputSpecs</td> <td>spark.hadoop.validateOutputSpecs</td>
<td>true</td> <td>true</td>
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists) <td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with output directories. We recommend that users do not disable this except if trying to achieve compatibility with
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td> previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
</tr> </tr>
</table> </table>
...@@ -861,7 +872,7 @@ Apart from these, the following properties are also available, and may be useful ...@@ -861,7 +872,7 @@ Apart from these, the following properties are also available, and may be useful
</table> </table>
#### Cluster Managers #### Cluster Managers
Each cluster manager in Spark has additional configuration options. Configurations Each cluster manager in Spark has additional configuration options. Configurations
can be found on the pages for each mode: can be found on the pages for each mode:
* [YARN](running-on-yarn.html#configuration) * [YARN](running-on-yarn.html#configuration)
......
...@@ -24,10 +24,10 @@ import scala.reflect.ClassTag ...@@ -24,10 +24,10 @@ import scala.reflect.ClassTag
import com.clearspring.analytics.stream.cardinality.HyperLogLog import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Serializer, Kryo} import com.esotericsoftware.kryo.{Serializer, Kryo}
import com.twitter.chill.AllScalaRegistrar import com.twitter.chill.{AllScalaRegistrar, ResourcePool}
import org.apache.spark.{SparkEnv, SparkConf} import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.serializer.KryoSerializer import org.apache.spark.serializer.{SerializerInstance, KryoSerializer}
import org.apache.spark.util.MutablePair import org.apache.spark.util.MutablePair
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
...@@ -48,22 +48,41 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co ...@@ -48,22 +48,41 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
} }
} }
private[sql] object SparkSqlSerializer { private[execution] class KryoResourcePool(size: Int)
// TODO (lian) Using KryoSerializer here is workaround, needs further investigation extends ResourcePool[SerializerInstance](size) {
// Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
// related error. val ser: KryoSerializer = {
@transient lazy val ser: KryoSerializer = {
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
// TODO (lian) Using KryoSerializer here is workaround, needs further investigation
// Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
// related error.
new KryoSerializer(sparkConf) new KryoSerializer(sparkConf)
} }
def serialize[T: ClassTag](o: T): Array[Byte] = { def newInstance() = ser.newInstance()
ser.newInstance().serialize(o).array() }
}
def deserialize[T: ClassTag](bytes: Array[Byte]): T = { private[sql] object SparkSqlSerializer {
ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) @transient lazy val resourcePool = new KryoResourcePool(30)
private[this] def acquireRelease[O](fn: SerializerInstance => O): O = {
val kryo = resourcePool.borrow
try {
fn(kryo)
} finally {
resourcePool.release(kryo)
}
} }
def serialize[T: ClassTag](o: T): Array[Byte] =
acquireRelease { k =>
k.serialize(o).array()
}
def deserialize[T: ClassTag](bytes: Array[Byte]): T =
acquireRelease { k =>
k.deserialize[T](ByteBuffer.wrap(bytes))
}
} }
private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] { private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment