From 4943ea59840a894ca47d241fe68d520f1e97fa56 Mon Sep 17 00:00:00 2001 From: zuotingbing <zuo.tingbing9@zte.com.cn> Date: Sun, 24 Sep 2017 09:38:46 +0100 Subject: [PATCH] [SPARK-22058][CORE] the BufferedInputStream will not be closed if an exception occurs. ## What changes were proposed in this pull request? EventLoggingListener use `val in = new BufferedInputStream(fs.open(log))` and will close it if `codec.map(_.compressedInputStream(in)).getOrElse(in)` occurs an exception . But, if `CompressionCodec.createCodec(new SparkConf, c)` throws an exception, the BufferedInputStream `in` will not be closed anymore. ## How was this patch tested? exist tests Author: zuotingbing <zuo.tingbing9@zte.com.cn> Closes #19277 from zuotingbing/SPARK-22058. --- .../org/apache/spark/scheduler/EventLoggingListener.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 00ab2a393e..9dafa0b764 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -351,14 +351,14 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption - val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) - } try { + val codec = codecName.map { c => + codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } codec.map(_.compressedInputStream(in)).getOrElse(in) } catch { - case e: Exception => + case e: Throwable => in.close() throw e } -- GitLab