Skip to content
Snippets Groups Projects
Commit 4943ea59 authored by zuotingbing's avatar zuotingbing Committed by Sean Owen
Browse files

[SPARK-22058][CORE] the BufferedInputStream will not be closed if an exception occurs.

## What changes were proposed in this pull request?

EventLoggingListener use `val in = new BufferedInputStream(fs.open(log))` and will close it if `codec.map(_.compressedInputStream(in)).getOrElse(in)`  occurs an exception .
But, if `CompressionCodec.createCodec(new SparkConf, c)` throws an exception, the BufferedInputStream `in` will not be closed anymore.

## How was this patch tested?

exist tests

Author: zuotingbing <zuo.tingbing9@zte.com.cn>

Closes #19277 from zuotingbing/SPARK-22058.
parent 9d48bd0b
No related branches found
No related tags found
No related merge requests found
...@@ -351,14 +351,14 @@ private[spark] object EventLoggingListener extends Logging { ...@@ -351,14 +351,14 @@ private[spark] object EventLoggingListener extends Logging {
// Since we sanitize the app ID to not include periods, it is safe to split on it // Since we sanitize the app ID to not include periods, it is safe to split on it
val logName = log.getName.stripSuffix(IN_PROGRESS) val logName = log.getName.stripSuffix(IN_PROGRESS)
val codecName: Option[String] = logName.split("\\.").tail.lastOption val codecName: Option[String] = logName.split("\\.").tail.lastOption
val codec = codecName.map { c =>
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
}
try { try {
val codec = codecName.map { c =>
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
}
codec.map(_.compressedInputStream(in)).getOrElse(in) codec.map(_.compressedInputStream(in)).getOrElse(in)
} catch { } catch {
case e: Exception => case e: Throwable =>
in.close() in.close()
throw e throw e
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment