diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index af33a2f2ca3e11b8dcf68ffd55563f8fd658bc01..554a33ce7f1a660a8d3f63e4b12281abf8ab539f 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -63,10 +63,11 @@ extends DeserializationStream { def close() { objIn.close() } } + private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoader: ClassLoader) extends SerializerInstance { - def serialize[T: ClassTag](t: T): ByteBuffer = { + override def serialize[T: ClassTag](t: T): ByteBuffer = { val bos = new ByteArrayOutputStream() val out = serializeStream(bos) out.writeObject(t) @@ -74,23 +75,23 @@ private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoade ByteBuffer.wrap(bos.toByteArray) } - def deserialize[T: ClassTag](bytes: ByteBuffer): T = { + override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { val bis = new ByteBufferInputStream(bytes) val in = deserializeStream(bis) - in.readObject().asInstanceOf[T] + in.readObject() } - def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { + override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { val bis = new ByteBufferInputStream(bytes) val in = deserializeStream(bis, loader) - in.readObject().asInstanceOf[T] + in.readObject() } - def serializeStream(s: OutputStream): SerializationStream = { + override def serializeStream(s: OutputStream): SerializationStream = { new JavaSerializationStream(s, counterReset) } - def deserializeStream(s: InputStream): DeserializationStream = { + override def deserializeStream(s: InputStream): DeserializationStream = { new JavaDeserializationStream(s, Utils.getContextOrSparkClassLoader) } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 99682220b4ab538c6861d08c464f2081897efdea..87ef9bb0b43c6718c0d7c9b233c1b89d5a350acd 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -91,7 +91,7 @@ class KryoSerializer(conf: SparkConf) Thread.currentThread.setContextClassLoader(classLoader) reg.registerClasses(kryo) } catch { - case e: Exception => + case e: Exception => throw new SparkException(s"Failed to invoke $regCls", e) } finally { Thread.currentThread.setContextClassLoader(oldClassLoader) @@ -106,7 +106,7 @@ class KryoSerializer(conf: SparkConf) kryo } - def newInstance(): SerializerInstance = { + override def newInstance(): SerializerInstance = { new KryoSerializerInstance(this) } } @@ -115,20 +115,20 @@ private[spark] class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream { val output = new KryoOutput(outStream) - def writeObject[T: ClassTag](t: T): SerializationStream = { + override def writeObject[T: ClassTag](t: T): SerializationStream = { kryo.writeClassAndObject(output, t) this } - def flush() { output.flush() } - def close() { output.close() } + override def flush() { output.flush() } + override def close() { output.close() } } private[spark] class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream { - val input = new KryoInput(inStream) + private val input = new KryoInput(inStream) - def readObject[T: ClassTag](): T = { + override def readObject[T: ClassTag](): T = { try { kryo.readClassAndObject(input).asInstanceOf[T] } catch { @@ -138,31 +138,31 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser } } - def close() { + override def close() { // Kryo's Input automatically closes the input stream it is using. input.close() } } private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { - val kryo = ks.newKryo() + private val kryo = ks.newKryo() // Make these lazy vals to avoid creating a buffer unless we use them - lazy val output = ks.newKryoOutput() - lazy val input = new KryoInput() + private lazy val output = ks.newKryoOutput() + private lazy val input = new KryoInput() - def serialize[T: ClassTag](t: T): ByteBuffer = { + override def serialize[T: ClassTag](t: T): ByteBuffer = { output.clear() kryo.writeClassAndObject(output, t) ByteBuffer.wrap(output.toBytes) } - def deserialize[T: ClassTag](bytes: ByteBuffer): T = { + override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { input.setBuffer(bytes.array) kryo.readClassAndObject(input).asInstanceOf[T] } - def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { + override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { val oldClassLoader = kryo.getClassLoader kryo.setClassLoader(loader) input.setBuffer(bytes.array) @@ -171,11 +171,11 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ obj } - def serializeStream(s: OutputStream): SerializationStream = { + override def serializeStream(s: OutputStream): SerializationStream = { new KryoSerializationStream(kryo, s) } - def deserializeStream(s: InputStream): DeserializationStream = { + override def deserializeStream(s: InputStream): DeserializationStream = { new KryoDeserializationStream(kryo, s) } } diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index e674438c8176cd4f16db75bc37b4cebeb3d2b7db..a9144cdd97b8c5796c531dd6a0047008cb966cbd 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -43,7 +43,7 @@ import org.apache.spark.util.{ByteBufferInputStream, NextIterator} * They are intended to be used to serialize/de-serialize data within a single Spark application. */ @DeveloperApi -trait Serializer { +abstract class Serializer { /** * Default ClassLoader to use in deserialization. Implementations of [[Serializer]] should @@ -61,10 +61,12 @@ trait Serializer { this } + /** Creates a new [[SerializerInstance]]. */ def newInstance(): SerializerInstance } +@DeveloperApi object Serializer { def getSerializer(serializer: Serializer): Serializer = { if (serializer == null) SparkEnv.get.serializer else serializer @@ -81,7 +83,7 @@ object Serializer { * An instance of a serializer, for use by one thread at a time. */ @DeveloperApi -trait SerializerInstance { +abstract class SerializerInstance { def serialize[T: ClassTag](t: T): ByteBuffer def deserialize[T: ClassTag](bytes: ByteBuffer): T @@ -91,21 +93,6 @@ trait SerializerInstance { def serializeStream(s: OutputStream): SerializationStream def deserializeStream(s: InputStream): DeserializationStream - - def serializeMany[T: ClassTag](iterator: Iterator[T]): ByteBuffer = { - // Default implementation uses serializeStream - val stream = new ByteArrayOutputStream() - serializeStream(stream).writeAll(iterator) - val buffer = ByteBuffer.wrap(stream.toByteArray) - buffer.flip() - buffer - } - - def deserializeMany(buffer: ByteBuffer): Iterator[Any] = { - // Default implementation uses deserializeStream - buffer.rewind() - deserializeStream(new ByteBufferInputStream(buffer)).asIterator - } } /** @@ -113,7 +100,7 @@ trait SerializerInstance { * A stream for writing serialized objects. */ @DeveloperApi -trait SerializationStream { +abstract class SerializationStream { def writeObject[T: ClassTag](t: T): SerializationStream def flush(): Unit def close(): Unit @@ -132,7 +119,7 @@ trait SerializationStream { * A stream for reading serialized objects. */ @DeveloperApi -trait DeserializationStream { +abstract class DeserializationStream { def readObject[T: ClassTag](): T def close(): Unit diff --git a/core/src/main/scala/org/apache/spark/serializer/package-info.java b/core/src/main/scala/org/apache/spark/serializer/package-info.java index 4c0b73ab36a002ae16fb6e630ac7c225d201f375..207c6e02e42930026216d4c628922fabecb591fd 100644 --- a/core/src/main/scala/org/apache/spark/serializer/package-info.java +++ b/core/src/main/scala/org/apache/spark/serializer/package-info.java @@ -18,4 +18,4 @@ /** * Pluggable serializers for RDD and shuffle data. */ -package org.apache.spark.serializer; \ No newline at end of file +package org.apache.spark.serializer; diff --git a/core/src/test/java/org/apache/spark/serializer/TestJavaSerializerImpl.java b/core/src/test/java/org/apache/spark/serializer/TestJavaSerializerImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..3d50ab4fabe426f56c69c724e9b5d96dcdccff48 --- /dev/null +++ b/core/src/test/java/org/apache/spark/serializer/TestJavaSerializerImpl.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import scala.Option; +import scala.reflect.ClassTag; + + +/** + * A simple Serializer implementation to make sure the API is Java-friendly. + */ +class TestJavaSerializerImpl extends Serializer { + + @Override + public SerializerInstance newInstance() { + return null; + } + + static class SerializerInstanceImpl extends SerializerInstance { + @Override + public <T> ByteBuffer serialize(T t, ClassTag<T> evidence$1) { + return null; + } + + @Override + public <T> T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag<T> evidence$1) { + return null; + } + + @Override + public <T> T deserialize(ByteBuffer bytes, ClassTag<T> evidence$1) { + return null; + } + + @Override + public SerializationStream serializeStream(OutputStream s) { + return null; + } + + @Override + public DeserializationStream deserializeStream(InputStream s) { + return null; + } + } + + static class SerializationStreamImpl extends SerializationStream { + + @Override + public <T> SerializationStream writeObject(T t, ClassTag<T> evidence$1) { + return null; + } + + @Override + public void flush() { + + } + + @Override + public void close() { + + } + } + + static class DeserializationStreamImpl extends DeserializationStream { + + @Override + public <T> T readObject(ClassTag<T> evidence$1) { + return null; + } + + @Override + public void close() { + + } + } +} diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..967c9e9899c9dc2e6acad125b669741dc81741f8 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import org.scalatest.FunSuite + +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.apache.spark.LocalSparkContext +import org.apache.spark.SparkException + + +class KryoSerializerResizableOutputSuite extends FunSuite { + + // trial and error showed this will not serialize with 1mb buffer + val x = (1 to 400000).toArray + + test("kryo without resizable output buffer should fail on large array") { + val conf = new SparkConf(false) + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "1") + conf.set("spark.kryoserializer.buffer.max.mb", "1") + val sc = new SparkContext("local", "test", conf) + intercept[SparkException](sc.parallelize(x).collect()) + LocalSparkContext.stop(sc) + } + + test("kryo with resizable output buffer should succeed on large array") { + val conf = new SparkConf(false) + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "1") + conf.set("spark.kryoserializer.buffer.max.mb", "2") + val sc = new SparkContext("local", "test", conf) + assert(sc.parallelize(x).collect() === x) + LocalSparkContext.stop(sc) + } +} diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index a579fd50bd9e4df5b77ddc93722a3935d76cea8c..e1e35b688d5811bd26ad2b4766d2a5412d654271 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.FunSuite import org.apache.spark.{SparkConf, SharedSparkContext} import org.apache.spark.serializer.KryoTest._ + class KryoSerializerSuite extends FunSuite with SharedSparkContext { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) @@ -207,7 +208,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { .fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x assert(10 + control.sum === result) } - + test("kryo with nonexistent custom registrator should fail") { import org.apache.spark.{SparkConf, SparkException} @@ -238,39 +239,12 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } } -class ClassLoaderTestingObject - -class KryoSerializerResizableOutputSuite extends FunSuite { - import org.apache.spark.SparkConf - import org.apache.spark.SparkContext - import org.apache.spark.LocalSparkContext - import org.apache.spark.SparkException - - // trial and error showed this will not serialize with 1mb buffer - val x = (1 to 400000).toArray - test("kryo without resizable output buffer should fail on large array") { - val conf = new SparkConf(false) - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryoserializer.buffer.mb", "1") - conf.set("spark.kryoserializer.buffer.max.mb", "1") - val sc = new SparkContext("local", "test", conf) - intercept[SparkException](sc.parallelize(x).collect) - LocalSparkContext.stop(sc) - } +class ClassLoaderTestingObject - test("kryo with resizable output buffer should succeed on large array") { - val conf = new SparkConf(false) - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryoserializer.buffer.mb", "1") - conf.set("spark.kryoserializer.buffer.max.mb", "2") - val sc = new SparkContext("local", "test", conf) - assert(sc.parallelize(x).collect === x) - LocalSparkContext.stop(sc) - } -} object KryoTest { + case class CaseClass(i: Int, s: String) {} class ClassWithNoArgConstructor { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 1e3c760b845ded54379e9a3ce466d63f37d2d624..bbe68b29d2d8e4d41cedd8d2a7e23f07e8594524 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -61,6 +61,17 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.storage.MemoryStore.Entry") ) ++ + Seq( + // Serializer interface change. See SPARK-3045. + ProblemFilters.exclude[IncompatibleTemplateDefProblem]( + "org.apache.spark.serializer.DeserializationStream"), + ProblemFilters.exclude[IncompatibleTemplateDefProblem]( + "org.apache.spark.serializer.Serializer"), + ProblemFilters.exclude[IncompatibleTemplateDefProblem]( + "org.apache.spark.serializer.SerializationStream"), + ProblemFilters.exclude[IncompatibleTemplateDefProblem]( + "org.apache.spark.serializer.SerializerInstance") + )++ Seq( // Renamed putValues -> putArray + putIterator ProblemFilters.exclude[MissingMethodProblem](