From c18fa464f404ed2612f8c4d355cb0544b355975b Mon Sep 17 00:00:00 2001 From: Ergin Seyfe <eseyfe@fb.com> Date: Sat, 21 May 2016 16:08:31 -0700 Subject: [PATCH] [SPARK-15280] Input/Output] Refactored OrcOutputWriter and moved serialization to a new class. ## What changes were proposed in this pull request? Refactoring: Separated ORC serialization logic from OrcOutputWriter and moved to a new class called OrcSerializer. ## How was this patch tested? Manual tests & existing tests. Author: Ergin Seyfe <eseyfe@fb.com> Closes #13066 from seyfe/orc_serializer. --- .../spark/sql/hive/orc/OrcRelation.scala | 84 ++++++++++--------- 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 6e55137dd7..38f50c112a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -149,39 +149,70 @@ private[sql] class DefaultSource } } -private[orc] class OrcOutputWriter( - path: String, - bucketId: Option[Int], - dataSchema: StructType, - context: TaskAttemptContext) - extends OutputWriter with HiveInspectors { +private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) + extends HiveInspectors { + + def serialize(row: InternalRow): Writable = { + wrapOrcStruct(cachedOrcStruct, structOI, row) + serializer.serialize(cachedOrcStruct, structOI) + } - private val serializer = { + private[this] val serializer = { val table = new Properties() table.setProperty("columns", dataSchema.fieldNames.mkString(",")) table.setProperty("columns.types", dataSchema.map(_.dataType.catalogString).mkString(":")) val serde = new OrcSerde - val configuration = context.getConfiguration - serde.initialize(configuration, table) + serde.initialize(conf, table) serde } - // Object inspector converted from the schema of the relation to be written. - private val structOI = { + // Object inspector converted from the schema of the relation to be serialized. + private[this] val structOI = { val typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString) OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo]) .asInstanceOf[SettableStructObjectInspector] } + private[this] val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct] + + private[this] def wrapOrcStruct( + struct: OrcStruct, + oi: SettableStructObjectInspector, + row: InternalRow): Unit = { + val fieldRefs = oi.getAllStructFieldRefs + var i = 0 + while (i < fieldRefs.size) { + + oi.setStructFieldData( + struct, + fieldRefs.get(i), + wrap( + row.get(i, dataSchema(i).dataType), + fieldRefs.get(i).getFieldObjectInspector, + dataSchema(i).dataType)) + i += 1 + } + } +} + +private[orc] class OrcOutputWriter( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext) + extends OutputWriter { + + private[this] val conf = context.getConfiguration + + private[this] val serializer = new OrcSerializer(dataSchema, conf) + // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this // flag to decide whether `OrcRecordWriter.close()` needs to be called. private var recordWriterInstantiated = false private lazy val recordWriter: RecordWriter[NullWritable, Writable] = { recordWriterInstantiated = true - - val conf = context.getConfiguration val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID") val taskAttemptId = context.getTaskAttemptID val partition = taskAttemptId.getTaskID.getId @@ -206,33 +237,8 @@ private[orc] class OrcOutputWriter( override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - private def wrapOrcStruct( - struct: OrcStruct, - oi: SettableStructObjectInspector, - row: InternalRow): Unit = { - val fieldRefs = oi.getAllStructFieldRefs - var i = 0 - while (i < fieldRefs.size) { - - oi.setStructFieldData( - struct, - fieldRefs.get(i), - wrap( - row.get(i, dataSchema(i).dataType), - fieldRefs.get(i).getFieldObjectInspector, - dataSchema(i).dataType)) - i += 1 - } - } - - val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct] - override protected[sql] def writeInternal(row: InternalRow): Unit = { - wrapOrcStruct(cachedOrcStruct, structOI, row) - - recordWriter.write( - NullWritable.get(), - serializer.serialize(cachedOrcStruct, structOI)) + recordWriter.write(NullWritable.get(), serializer.serialize(row)) } override def close(): Unit = { -- GitLab