diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index aaeb3d003829a3aeb863828c66f02a525b6ef5dd..6de1fc06858e468f2eb30fa37050a0ca58a04c87 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -146,7 +146,7 @@ object SparkHadoopMapReduceWriter extends Logging { case c: Configurable => c.setConf(hadoopConf) case _ => () } - val writer = taskFormat.getRecordWriter(taskContext) + var writer = taskFormat.getRecordWriter(taskContext) .asInstanceOf[RecordWriter[K, V]] require(writer != null, "Unable to obtain RecordWriter") var recordsWritten = 0L @@ -154,6 +154,7 @@ object SparkHadoopMapReduceWriter extends Logging { // Write all rows in RDD partition. try { val ret = Utils.tryWithSafeFinallyAndFailureCallbacks { + // Write rows out, release resource and commit the task. while (iterator.hasNext) { val pair = iterator.next() writer.write(pair._1, pair._2) @@ -163,12 +164,23 @@ object SparkHadoopMapReduceWriter extends Logging { outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } - + if (writer != null) { + writer.close(taskContext) + writer = null + } committer.commitTask(taskContext) }(catchBlock = { - committer.abortTask(taskContext) - logError(s"Task ${taskContext.getTaskAttemptID} aborted.") - }, finallyBlock = writer.close(taskContext)) + // If there is an error, release resource and then abort the task. + try { + if (writer != null) { + writer.close(taskContext) + writer = null + } + } finally { + committer.abortTask(taskContext) + logError(s"Task ${taskContext.getTaskAttemptID} aborted.") + } + }) outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>