diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 08e05ae0c095b0ac289148a1189941be4d360817..26a6a3effc9ac929694ce45da8106b1b9e42ed09 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.{InputStream, IOException} +import java.io.{EOFException, InputStream, IOException} import scala.io.Source @@ -107,6 +107,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { } } } catch { + case _: EOFException if maybeTruncated => case ioe: IOException => throw ioe case e: Exception => diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 1732aca9417ead2a6c2286c28814b3bd77c347d2..88a68af6b647d6028b720290df9632a76f39d673 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -17,15 +17,16 @@ package org.apache.spark.scheduler -import java.io.{File, PrintWriter} +import java.io._ import java.net.URI +import java.util.concurrent.atomic.AtomicInteger import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfter import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.io.CompressionCodec +import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec} import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils} /** @@ -72,6 +73,59 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd)) } + /** + * Test replaying compressed spark history file that internally throws an EOFException. To + * avoid sensitivity to the compression specifics the test forces an EOFException to occur + * while reading bytes from the underlying stream (such as observed in actual history files + * in some cases) and forces specific failure handling. This validates correctness in both + * cases when maybeTruncated is true or false. + */ + test("Replay compressed inprogress log file succeeding on partial read") { + val buffered = new ByteArrayOutputStream + val codec = new LZ4CompressionCodec(new SparkConf()) + val compstream = codec.compressedOutputStream(buffered) + val writer = new PrintWriter(compstream) + + val applicationStart = SparkListenerApplicationStart("AppStarts", None, + 125L, "Mickey", None) + val applicationEnd = SparkListenerApplicationEnd(1000L) + + // scalastyle:off println + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) + // scalastyle:on println + writer.close() + + val logFilePath = Utils.getFilePath(testDir, "events.lz4.inprogress") + val fstream = fileSystem.create(logFilePath) + val bytes = buffered.toByteArray + + fstream.write(bytes, 0, buffered.size) + fstream.close + + // Read the compressed .inprogress file and verify only first event was parsed. + val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) + val replayer = new ReplayListenerBus() + + val eventMonster = new EventMonster(conf) + replayer.addListener(eventMonster) + + // Verify the replay returns the events given the input maybe truncated. + val logData = EventLoggingListener.openEventLog(logFilePath, fileSystem) + val failingStream = new EarlyEOFInputStream(logData, buffered.size - 10) + replayer.replay(failingStream, logFilePath.toString, true) + + assert(eventMonster.loggedEvents.size === 1) + assert(failingStream.didFail) + + // Verify the replay throws the EOF exception since the input may not be truncated. + val logData2 = EventLoggingListener.openEventLog(logFilePath, fileSystem) + val failingStream2 = new EarlyEOFInputStream(logData2, buffered.size - 10) + intercept[EOFException] { + replayer.replay(failingStream2, logFilePath.toString, false) + } + } + // This assumes the correctness of EventLoggingListener test("End-to-end replay") { testApplicationReplay() @@ -156,4 +210,23 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp override def start() { } } + + /* + * This is a dummy input stream that wraps another input stream but ends prematurely when + * reading at the specified position, throwing an EOFExeption. + */ + private class EarlyEOFInputStream(in: InputStream, failAtPos: Int) extends InputStream { + private val countDown = new AtomicInteger(failAtPos) + + def didFail: Boolean = countDown.get == 0 + + @throws[IOException] + def read: Int = { + if (countDown.get == 0) { + throw new EOFException("Stream ended prematurely") + } + countDown.decrementAndGet() + in.read + } + } }