diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 6e55137dd78e1462db62499a485cbecb04ec8913..38f50c112a23674209169bfacd4b6e0352bf9724 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -149,39 +149,70 @@ private[sql] class DefaultSource
   }
 }
 
-private[orc] class OrcOutputWriter(
-    path: String,
-    bucketId: Option[Int],
-    dataSchema: StructType,
-    context: TaskAttemptContext)
-  extends OutputWriter with HiveInspectors {
+private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)
+  extends HiveInspectors {
+
+  def serialize(row: InternalRow): Writable = {
+    wrapOrcStruct(cachedOrcStruct, structOI, row)
+    serializer.serialize(cachedOrcStruct, structOI)
+  }
 
-  private val serializer = {
+  private[this] val serializer = {
     val table = new Properties()
     table.setProperty("columns", dataSchema.fieldNames.mkString(","))
     table.setProperty("columns.types", dataSchema.map(_.dataType.catalogString).mkString(":"))
 
     val serde = new OrcSerde
-    val configuration = context.getConfiguration
-    serde.initialize(configuration, table)
+    serde.initialize(conf, table)
     serde
   }
 
-  // Object inspector converted from the schema of the relation to be written.
-  private val structOI = {
+  // Object inspector converted from the schema of the relation to be serialized.
+  private[this] val structOI = {
     val typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString)
     OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
       .asInstanceOf[SettableStructObjectInspector]
   }
 
+  private[this] val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
+
+  private[this] def wrapOrcStruct(
+      struct: OrcStruct,
+      oi: SettableStructObjectInspector,
+      row: InternalRow): Unit = {
+    val fieldRefs = oi.getAllStructFieldRefs
+    var i = 0
+    while (i < fieldRefs.size) {
+
+      oi.setStructFieldData(
+        struct,
+        fieldRefs.get(i),
+        wrap(
+          row.get(i, dataSchema(i).dataType),
+          fieldRefs.get(i).getFieldObjectInspector,
+          dataSchema(i).dataType))
+      i += 1
+    }
+  }
+}
+
+private[orc] class OrcOutputWriter(
+    path: String,
+    bucketId: Option[Int],
+    dataSchema: StructType,
+    context: TaskAttemptContext)
+  extends OutputWriter {
+
+  private[this] val conf = context.getConfiguration
+
+  private[this] val serializer = new OrcSerializer(dataSchema, conf)
+
   // `OrcRecordWriter.close()` creates an empty file if no rows are written at all.  We use this
   // flag to decide whether `OrcRecordWriter.close()` needs to be called.
   private var recordWriterInstantiated = false
 
   private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
     recordWriterInstantiated = true
-
-    val conf = context.getConfiguration
     val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
     val taskAttemptId = context.getTaskAttemptID
     val partition = taskAttemptId.getTaskID.getId
@@ -206,33 +237,8 @@ private[orc] class OrcOutputWriter(
   override def write(row: Row): Unit =
     throw new UnsupportedOperationException("call writeInternal")
 
-  private def wrapOrcStruct(
-      struct: OrcStruct,
-      oi: SettableStructObjectInspector,
-      row: InternalRow): Unit = {
-    val fieldRefs = oi.getAllStructFieldRefs
-    var i = 0
-    while (i < fieldRefs.size) {
-
-      oi.setStructFieldData(
-        struct,
-        fieldRefs.get(i),
-        wrap(
-          row.get(i, dataSchema(i).dataType),
-          fieldRefs.get(i).getFieldObjectInspector,
-          dataSchema(i).dataType))
-      i += 1
-    }
-  }
-
-  val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
-
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
-    wrapOrcStruct(cachedOrcStruct, structOI, row)
-
-    recordWriter.write(
-      NullWritable.get(),
-      serializer.serialize(cachedOrcStruct, structOI))
+    recordWriter.write(NullWritable.get(), serializer.serialize(row))
   }
 
   override def close(): Unit = {