From 394d8cb1c4dfd1e496562009e716b8fc06be22cd Mon Sep 17 00:00:00 2001 From: Andrew Or <andrewor14@gmail.com> Date: Thu, 1 May 2014 21:42:06 -0700 Subject: [PATCH] Add tests for FileLogger, EventLoggingListener, and ReplayListenerBus Modifications to Spark core are limited to exposing functionality to test files + minor style fixes. (728 / 769 lines are from tests) Author: Andrew Or <andrewor14@gmail.com> Closes #591 from andrewor14/event-log-tests and squashes the following commits: 2883837 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests c3afcea [Andrew Or] Compromise 2d5daf8 [Andrew Or] Use temp directory provided by the OS rather than /tmp 2b52151 [Andrew Or] Remove unnecessary file delete + add a comment 62010fd [Andrew Or] More cleanup (renaming variables, updating comments etc) ad2beff [Andrew Or] Clean up EventLoggingListenerSuite + modify a few comments 862e752 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests e0ba2f8 [Andrew Or] Fix test failures caused by race condition in processing/mutating events b990453 [Andrew Or] ReplayListenerBus suite - tests do not all pass yet ab66a84 [Andrew Or] Tests for FileLogger + delete file after tests 187bb25 [Andrew Or] Formatting and renaming variables 769336f [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests 5d38ffe [Andrew Or] Clean up EventLoggingListenerSuite + add comments e12f4b1 [Andrew Or] Preliminary tests for EventLoggingListener (need major cleanup) --- .../scheduler/EventLoggingListener.scala | 40 +- .../spark/scheduler/SparkListenerBus.scala | 2 +- .../org/apache/spark/util/FileLogger.scala | 28 +- .../org/apache/spark/util/JsonProtocol.scala | 10 +- .../scala/org/apache/spark/util/Utils.scala | 18 +- .../scheduler/EventLoggingListenerSuite.scala | 400 ++++++++++++++++++ .../spark/scheduler/ReplayListenerSuite.scala | 166 ++++++++ .../apache/spark/util/FileLoggerSuite.scala | 163 +++++++ 8 files changed, 791 insertions(+), 36 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index d822a8e551..7968a0691d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -18,13 +18,16 @@ package org.apache.spark.scheduler import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission +import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{FileLogger, JsonProtocol} @@ -40,31 +43,36 @@ import org.apache.spark.util.{FileLogger, JsonProtocol} */ private[spark] class EventLoggingListener( appName: String, - conf: SparkConf, - hadoopConfiguration: Configuration) + sparkConf: SparkConf, + hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration()) extends SparkListener with Logging { import EventLoggingListener._ - private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false) - private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false) - private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024 - private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/") + private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) + private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) + private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) + private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 + private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis val logDir = logBaseDir + "/" + name - private val logger = - new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress, - shouldOverwrite, Some(LOG_FILE_PERMISSIONS)) + protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, + shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS)) + + // For testing. Keep track of all JSON serialized events that have been logged. + private[scheduler] val loggedEvents = new ArrayBuffer[JValue] /** * Begin logging events. * If compression is used, log a file that indicates which compression library is used. */ def start() { + logger.start() logInfo("Logging events to %s".format(logDir)) if (shouldCompress) { - val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) + val codec = + sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) logger.newFile(COMPRESSION_CODEC_PREFIX + codec) } logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION) @@ -73,11 +81,14 @@ private[spark] class EventLoggingListener( /** Log the event as JSON. */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { - val eventJson = compact(render(JsonProtocol.sparkEventToJson(event))) - logger.logLine(eventJson) + val eventJson = JsonProtocol.sparkEventToJson(event) + logger.logLine(compact(render(eventJson))) if (flushLogger) { logger.flush() } + if (testing) { + loggedEvents += eventJson + } } // Events that do not trigger a flush @@ -121,13 +132,12 @@ private[spark] class EventLoggingListener( } private[spark] object EventLoggingListener extends Logging { + val DEFAULT_LOG_DIR = "/tmp/spark-events" val LOG_PREFIX = "EVENT_LOG_" val SPARK_VERSION_PREFIX = "SPARK_VERSION_" val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" - val LOG_FILE_PERMISSIONS: FsPermission = - FsPermission.createImmutable(Integer.parseInt("770", 8).toShort) - + val LOG_FILE_PERMISSIONS = FsPermission.createImmutable(Integer.parseInt("770", 8).toShort) // A cache for compression codecs to avoid creating the same codec many times private val codecMap = new mutable.HashMap[String, CompressionCodec] diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index d6df193d9b..0286aac876 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -37,7 +37,7 @@ private[spark] trait SparkListenerBus { * Post an event to all attached listeners. This does nothing if the event is * SparkListenerShutdown. */ - protected def postToAll(event: SparkListenerEvent) { + def postToAll(event: SparkListenerEvent) { event match { case stageSubmitted: SparkListenerStageSubmitted => sparkListeners.foreach(_.onStageSubmitted(stageSubmitted)) diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 0965e0f0f7..0e6d21b220 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException} +import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter} import java.net.URI import java.text.SimpleDateFormat import java.util.Date @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec /** @@ -39,8 +40,8 @@ import org.apache.spark.io.CompressionCodec */ private[spark] class FileLogger( logDir: String, - conf: SparkConf, - hadoopConfiguration: Configuration, + sparkConf: SparkConf, + hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(), outputBufferSize: Int = 8 * 1024, // 8 KB compress: Boolean = false, overwrite: Boolean = true, @@ -55,14 +56,19 @@ private[spark] class FileLogger( var fileIndex = 0 // Only used if compression is enabled - private lazy val compressionCodec = CompressionCodec.createCodec(conf) + private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf) // Only defined if the file system scheme is not local private var hadoopDataStream: Option[FSDataOutputStream] = None private var writer: Option[PrintWriter] = None - createLogDir() + /** + * Start this logger by creating the logging directory. + */ + def start() { + createLogDir() + } /** * Create a logging directory with the given path. @@ -83,7 +89,7 @@ private[spark] class FileLogger( } if (dirPermissions.isDefined) { val fsStatus = fileSystem.getFileStatus(path) - if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) { + if (fsStatus.getPermission.toShort != dirPermissions.get.toShort) { fileSystem.setPermission(path, dirPermissions.get) } } @@ -92,14 +98,14 @@ private[spark] class FileLogger( /** * Create a new writer for the file identified by the given path. * If the permissions are not passed in, it will default to use the permissions - * (dirpermissions) used when class was instantiated. + * (dirPermissions) used when class was instantiated. */ private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = { val logPath = logDir + "/" + fileName val uri = new URI(logPath) - val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme - val isDefaultLocal = (defaultFs == null || defaultFs == "file") val path = new Path(logPath) + val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme + val isDefaultLocal = defaultFs == null || defaultFs == "file" /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). * Therefore, for local files, use FileOutputStream instead. */ @@ -112,7 +118,7 @@ private[spark] class FileLogger( hadoopDataStream.get } - perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)} + perms.orElse(dirPermissions).foreach { p => fileSystem.setPermission(path, p) } val bstream = new BufferedOutputStream(dstream, outputBufferSize) val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream new PrintWriter(cstream) @@ -127,7 +133,7 @@ private[spark] class FileLogger( val writeInfo = if (!withTime) { msg } else { - val date = new Date(System.currentTimeMillis()) + val date = new Date(System.currentTimeMillis) dateFormat.get.format(date) + ": " + msg } writer.foreach(_.print(writeInfo)) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 9aed3e0985..09825087bb 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -646,11 +646,11 @@ private[spark] object JsonProtocol { } def propertiesFromJson(json: JValue): Properties = { - val properties = new Properties() - if (json != JNothing) { - mapFromJson(json).map { case (k, v) => properties.setProperty(k, v) } - } - properties + Utils.jsonOption(json).map { value => + val properties = new Properties + mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) } + properties + }.getOrElse(null) } def UUIDFromJson(json: JValue): UUID = { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2c934a4bac..536a740140 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1062,15 +1062,25 @@ private[spark] object Utils extends Logging { } /** - * return true if this is Windows. + * Return the absolute path of a file in the given directory. */ - def isWindows = Option(System.getProperty("os.name")). - map(_.startsWith("Windows")).getOrElse(false) + def getFilePath(dir: File, fileName: String): Path = { + assert(dir.isDirectory) + val path = new File(dir, fileName).getAbsolutePath + new Path(path) + } + + /** + * Return true if this is Windows. + */ + def isWindows = { + Option(System.getProperty("os.name")).exists(_.startsWith("Windows")) + } /** * Indicates whether Spark is currently running unit tests. */ - private[spark] def isTesting = { + def isTesting = { sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala new file mode 100644 index 0000000000..95f5bcd855 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import scala.io.Source +import scala.util.Try + +import com.google.common.io.Files +import org.apache.hadoop.fs.{FileStatus, Path} +import org.json4s.jackson.JsonMethods._ +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * Test whether EventLoggingListener logs events properly. + * + * This tests whether EventLoggingListener actually creates special files while logging events, + * whether the parsing of these special files is correct, and whether the logged events can be + * read and deserialized into actual SparkListenerEvents. + */ +class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { + private val fileSystem = Utils.getHadoopFileSystem("/") + private val allCompressionCodecs = Seq[String]( + "org.apache.spark.io.LZFCompressionCodec", + "org.apache.spark.io.SnappyCompressionCodec" + ) + private val testDir = Files.createTempDir() + private val logDirPath = Utils.getFilePath(testDir, "spark-events") + + after { + Try { fileSystem.delete(logDirPath, true) } + } + + test("Parse names of special files") { + testParsingFileName() + } + + test("Verify special files exist") { + testSpecialFilesExist() + } + + test("Verify special files exist with compression") { + allCompressionCodecs.foreach { codec => + testSpecialFilesExist(compressionCodec = Some(codec)) + } + } + + test("Parse event logging info") { + testParsingLogInfo() + } + + test("Parse event logging info with compression") { + allCompressionCodecs.foreach { codec => + testParsingLogInfo(compressionCodec = Some(codec)) + } + } + + test("Basic event logging") { + testEventLogging() + } + + test("Basic event logging with compression") { + allCompressionCodecs.foreach { codec => + testEventLogging(compressionCodec = Some(codec)) + } + } + + test("End-to-end event logging") { + testApplicationEventLogging() + } + + test("End-to-end event logging with compression") { + allCompressionCodecs.foreach { codec => + testApplicationEventLogging(compressionCodec = Some(codec)) + } + } + + + /* ----------------- * + * Actual test logic * + * ----------------- */ + + import EventLoggingListenerSuite._ + + /** + * Test whether names of special files are correctly identified and parsed. + */ + private def testParsingFileName() { + val logPrefix = EventLoggingListener.LOG_PREFIX + val sparkVersionPrefix = EventLoggingListener.SPARK_VERSION_PREFIX + val compressionCodecPrefix = EventLoggingListener.COMPRESSION_CODEC_PREFIX + val applicationComplete = EventLoggingListener.APPLICATION_COMPLETE + assert(EventLoggingListener.isEventLogFile(logPrefix + "0")) + assert(EventLoggingListener.isEventLogFile(logPrefix + "100")) + assert(EventLoggingListener.isEventLogFile(logPrefix + "ANYTHING")) + assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "0.9.1")) + assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "1.0.0")) + assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "ANYTHING")) + assert(EventLoggingListener.isApplicationCompleteFile(applicationComplete)) + allCompressionCodecs.foreach { codec => + assert(EventLoggingListener.isCompressionCodecFile(compressionCodecPrefix + codec)) + } + + // Negatives + assert(!EventLoggingListener.isEventLogFile("The greatest man of all mankind")) + assert(!EventLoggingListener.isSparkVersionFile("Will never falter in the face of death!")) + assert(!EventLoggingListener.isCompressionCodecFile("Unless he chooses to leave behind")) + assert(!EventLoggingListener.isApplicationCompleteFile("The very treasure he calls Macbeth")) + + // Verify that parsing is correct + assert(EventLoggingListener.parseSparkVersion(sparkVersionPrefix + "1.0.0") === "1.0.0") + allCompressionCodecs.foreach { codec => + assert(EventLoggingListener.parseCompressionCodec(compressionCodecPrefix + codec) === codec) + } + } + + /** + * Test whether the special files produced by EventLoggingListener exist. + * + * There should be exactly one event log and one spark version file throughout the entire + * execution. If a compression codec is specified, then the compression codec file should + * also exist. Only after the application has completed does the test expect the application + * completed file to be present. + */ + private def testSpecialFilesExist(compressionCodec: Option[String] = None) { + + def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) { + val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0 + val numApplicationCompleteFiles = if (loggerStopped) 1 else 0 + assert(logFiles.size === 2 + numCompressionCodecFiles + numApplicationCompleteFiles) + assert(eventLogsExist(logFiles)) + assert(sparkVersionExists(logFiles)) + assert(compressionCodecExists(logFiles) === compressionCodec.isDefined) + assert(applicationCompleteExists(logFiles) === loggerStopped) + assertSparkVersionIsValid(logFiles) + compressionCodec.foreach { codec => + assertCompressionCodecIsValid(logFiles, codec) + } + } + + // Verify logging directory exists + val conf = getLoggingConf(logDirPath, compressionCodec) + val eventLogger = new EventLoggingListener("test", conf) + eventLogger.start() + val logPath = new Path(eventLogger.logDir) + assert(fileSystem.exists(logPath)) + val logDir = fileSystem.getFileStatus(logPath) + assert(logDir.isDir) + + // Verify special files are as expected before stop() + var logFiles = fileSystem.listStatus(logPath) + assert(logFiles != null) + assertFilesExist(logFiles, loggerStopped = false) + + // Verify special files are as expected after stop() + eventLogger.stop() + logFiles = fileSystem.listStatus(logPath) + assertFilesExist(logFiles, loggerStopped = true) + } + + /** + * Test whether EventLoggingListener correctly parses the correct information from the logs. + * + * This includes whether it returns the correct Spark version, compression codec (if any), + * and the application's completion status. + */ + private def testParsingLogInfo(compressionCodec: Option[String] = None) { + + def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) { + assert(info.logPaths.size > 0) + assert(info.sparkVersion === SparkContext.SPARK_VERSION) + assert(info.compressionCodec.isDefined === compressionCodec.isDefined) + info.compressionCodec.foreach { codec => + assert(compressionCodec.isDefined) + val expectedCodec = compressionCodec.get.split('.').last + assert(codec.getClass.getSimpleName === expectedCodec) + } + assert(info.applicationComplete === loggerStopped) + } + + // Verify that all information is correctly parsed before stop() + val conf = getLoggingConf(logDirPath, compressionCodec) + val eventLogger = new EventLoggingListener("test", conf) + eventLogger.start() + var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assertInfoCorrect(eventLoggingInfo, loggerStopped = false) + + // Verify that all information is correctly parsed after stop() + eventLogger.stop() + eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assertInfoCorrect(eventLoggingInfo, loggerStopped = true) + } + + /** + * Test basic event logging functionality. + * + * This creates two simple events, posts them to the EventLoggingListener, and verifies that + * exactly these two events are logged in the expected file. + */ + private def testEventLogging(compressionCodec: Option[String] = None) { + val conf = getLoggingConf(logDirPath, compressionCodec) + val eventLogger = new EventLoggingListener("test", conf) + val listenerBus = new LiveListenerBus + val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") + val applicationEnd = SparkListenerApplicationEnd(1000L) + + // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite + eventLogger.start() + listenerBus.start() + listenerBus.addListener(eventLogger) + listenerBus.postToAll(applicationStart) + listenerBus.postToAll(applicationEnd) + + // Verify file contains exactly the two events logged + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assert(eventLoggingInfo.logPaths.size > 0) + val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) + assert(lines.size === 2) + assert(lines(0).contains("SparkListenerApplicationStart")) + assert(lines(1).contains("SparkListenerApplicationEnd")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart) + assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd) + eventLogger.stop() + } + + /** + * Test end-to-end event logging functionality in an application. + * This runs a simple Spark job and asserts that the expected events are logged when expected. + */ + private def testApplicationEventLogging(compressionCodec: Option[String] = None) { + val conf = getLoggingConf(logDirPath, compressionCodec) + val sc = new SparkContext("local", "test", conf) + assert(sc.eventLogger.isDefined) + val eventLogger = sc.eventLogger.get + val expectedLogDir = logDirPath.toString + assert(eventLogger.logDir.startsWith(expectedLogDir)) + + // Begin listening for events that trigger asserts + val eventExistenceListener = new EventExistenceListener(eventLogger) + sc.addSparkListener(eventExistenceListener) + + // Trigger asserts for whether the expected events are actually logged + sc.parallelize(1 to 10000).count() + sc.stop() + + // Ensure all asserts have actually been triggered + eventExistenceListener.assertAllCallbacksInvoked() + } + + /** + * Assert that all of the specified events are logged by the given EventLoggingListener. + */ + private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) { + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assert(eventLoggingInfo.logPaths.size > 0) + val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) + val eventSet = mutable.Set(events: _*) + lines.foreach { line => + eventSet.foreach { event => + if (line.contains(event)) { + val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line)) + val eventType = Utils.getFormattedClassName(parsedEvent) + if (eventType == event) { + eventSet.remove(event) + } + } + } + } + assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) + } + + /** + * Read all lines from the file specified by the given path. + * If a compression codec is specified, use it to read the file. + */ + private def readFileLines( + filePath: Path, + compressionCodec: Option[CompressionCodec]): Seq[String] = { + val fstream = fileSystem.open(filePath) + val cstream = + compressionCodec.map { codec => + codec.compressedInputStream(fstream) + }.getOrElse(fstream) + Source.fromInputStream(cstream).getLines().toSeq + } + + /** + * A listener that asserts certain events are logged by the given EventLoggingListener. + * This is necessary because events are posted asynchronously in a different thread. + */ + private class EventExistenceListener(eventLogger: EventLoggingListener) extends SparkListener { + var jobStarted = false + var jobEnded = false + var appEnded = false + + override def onJobStart(jobStart: SparkListenerJobStart) { + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerApplicationStart), + Utils.getFormattedClassName(SparkListenerBlockManagerAdded), + Utils.getFormattedClassName(SparkListenerEnvironmentUpdate) + )) + jobStarted = true + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd) { + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerJobStart), + Utils.getFormattedClassName(SparkListenerJobEnd), + Utils.getFormattedClassName(SparkListenerStageSubmitted), + Utils.getFormattedClassName(SparkListenerStageCompleted), + Utils.getFormattedClassName(SparkListenerTaskStart), + Utils.getFormattedClassName(SparkListenerTaskEnd) + )) + jobEnded = true + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerApplicationEnd) + )) + appEnded = true + } + + def assertAllCallbacksInvoked() { + assert(jobStarted, "JobStart callback not invoked!") + assert(jobEnded, "JobEnd callback not invoked!") + assert(appEnded, "ApplicationEnd callback not invoked!") + } + } + + + /* -------------------------------------------------------- * + * Helper methods for validating state of the special files * + * -------------------------------------------------------- */ + + private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile) + } + + private def sparkVersionExists(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isSparkVersionFile) + } + + private def compressionCodecExists(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isCompressionCodecFile) + } + + private def applicationCompleteExists(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isApplicationCompleteFile) + } + + private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) { + val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile) + assert(file.isDefined) + assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION) + } + + private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) { + val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isCompressionCodecFile) + assert(file.isDefined) + assert(EventLoggingListener.parseCompressionCodec(file.get) === compressionCodec) + } + +} + + +object EventLoggingListenerSuite { + + /** Get a SparkConf with event logging enabled. */ + def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None) = { + val conf = new SparkConf + conf.set("spark.eventLog.enabled", "true") + conf.set("spark.eventLog.testing", "true") + conf.set("spark.eventLog.dir", logDir.toString) + compressionCodec.foreach { codec => + conf.set("spark.eventLog.compress", "true") + conf.set("spark.io.compression.codec", codec) + } + conf + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala new file mode 100644 index 0000000000..d1fe1fc348 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.PrintWriter + +import scala.util.Try + +import com.google.common.io.Files +import org.json4s.jackson.JsonMethods._ +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * Test whether ReplayListenerBus replays events from logs correctly. + */ +class ReplayListenerSuite extends FunSuite with BeforeAndAfter { + private val fileSystem = Utils.getHadoopFileSystem("/") + private val allCompressionCodecs = Seq[String]( + "org.apache.spark.io.LZFCompressionCodec", + "org.apache.spark.io.SnappyCompressionCodec" + ) + private val testDir = Files.createTempDir() + + after { + Try { fileSystem.delete(Utils.getFilePath(testDir, "events.txt"), true) } + Try { fileSystem.delete(Utils.getFilePath(testDir, "test-replay"), true) } + } + + test("Simple replay") { + testSimpleReplay() + } + + test("Simple replay with compression") { + allCompressionCodecs.foreach { codec => + testSimpleReplay(Some(codec)) + } + } + + // This assumes the correctness of EventLoggingListener + test("End-to-end replay") { + testApplicationReplay() + } + + // This assumes the correctness of EventLoggingListener + test("End-to-end replay with compression") { + allCompressionCodecs.foreach { codec => + testApplicationReplay(Some(codec)) + } + } + + + /* ----------------- * + * Actual test logic * + * ----------------- */ + + /** + * Test simple replaying of events. + */ + private def testSimpleReplay(codecName: Option[String] = None) { + val logFilePath = Utils.getFilePath(testDir, "events.txt") + val codec = codecName.map(getCompressionCodec) + val fstream = fileSystem.create(logFilePath) + val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) + val writer = new PrintWriter(cstream) + val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") + val applicationEnd = SparkListenerApplicationEnd(1000L) + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) + writer.close() + val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) + val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName) + val eventMonster = new EventMonster(conf) + replayer.addListener(eventMonster) + replayer.replay() + assert(eventMonster.loggedEvents.size === 2) + assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart)) + assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd)) + } + + /** + * Test end-to-end replaying of events. + * + * This test runs a few simple jobs with event logging enabled, and compares each emitted + * event to the corresponding event replayed from the event logs. This test makes the + * assumption that the event logging behavior is correct (tested in a separate suite). + */ + private def testApplicationReplay(codecName: Option[String] = None) { + val logDirPath = Utils.getFilePath(testDir, "test-replay") + val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName) + val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf) + + // Run a few jobs + sc.parallelize(1 to 100, 1).count() + sc.parallelize(1 to 100, 2).map(i => (i, i)).count() + sc.parallelize(1 to 100, 3).map(i => (i, i)).groupByKey().count() + sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().persist().count() + sc.stop() + + // Prepare information needed for replay + val codec = codecName.map(getCompressionCodec) + val applications = fileSystem.listStatus(logDirPath) + assert(applications != null && applications.size > 0) + val eventLogDir = applications.sortBy(_.getAccessTime).last + assert(eventLogDir.isDir) + val logFiles = fileSystem.listStatus(eventLogDir.getPath) + assert(logFiles != null && logFiles.size > 0) + val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_")) + assert(logFile.isDefined) + val logFilePath = logFile.get.getPath + + // Replay events + val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) + val eventMonster = new EventMonster(conf) + replayer.addListener(eventMonster) + replayer.replay() + + // Verify the same events are replayed in the same order + assert(sc.eventLogger.isDefined) + val originalEvents = sc.eventLogger.get.loggedEvents + val replayedEvents = eventMonster.loggedEvents + originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) } + } + + /** + * A simple listener that buffers all the events it receives. + * + * The event buffering functionality must be implemented within EventLoggingListener itself. + * This is because of the following race condition: the event may be mutated between being + * processed by one listener and being processed by another. Thus, in order to establish + * a fair comparison between the original events and the replayed events, both functionalities + * must be implemented within one listener (i.e. the EventLoggingListener). + * + * This child listener inherits only the event buffering functionality, but does not actually + * log the events. + */ + private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) { + logger.close() + } + + private def getCompressionCodec(codecName: String) = { + val conf = new SparkConf + conf.set("spark.io.compression.codec", codecName) + CompressionCodec.createCodec(conf) + } + +} diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala new file mode 100644 index 0000000000..f675e1e5b4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.io.Source +import scala.util.Try + +import com.google.common.io.Files +import org.apache.hadoop.fs.Path +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.io.CompressionCodec + +/** + * Test writing files through the FileLogger. + */ +class FileLoggerSuite extends FunSuite with BeforeAndAfter { + private val fileSystem = Utils.getHadoopFileSystem("/") + private val allCompressionCodecs = Seq[String]( + "org.apache.spark.io.LZFCompressionCodec", + "org.apache.spark.io.SnappyCompressionCodec" + ) + private val testDir = Files.createTempDir() + private val logDirPath = Utils.getFilePath(testDir, "test-file-logger") + private val logDirPathString = logDirPath.toString + + after { + Try { fileSystem.delete(logDirPath, true) } + } + + test("Simple logging") { + testSingleFile() + } + + test ("Simple logging with compression") { + allCompressionCodecs.foreach { codec => + testSingleFile(Some(codec)) + } + } + + test("Logging multiple files") { + testMultipleFiles() + } + + test("Logging multiple files with compression") { + allCompressionCodecs.foreach { codec => + testMultipleFiles(Some(codec)) + } + } + + test("Logging when directory already exists") { + // Create the logging directory multiple times + new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() + new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() + new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() + + // If overwrite is not enabled, an exception should be thrown + intercept[IOException] { + new FileLogger(logDirPathString, new SparkConf, overwrite = false).start() + } + } + + + /* ----------------- * + * Actual test logic * + * ----------------- */ + + /** + * Test logging to a single file. + */ + private def testSingleFile(codecName: Option[String] = None) { + val conf = getLoggingConf(codecName) + val codec = codecName.map { c => CompressionCodec.createCodec(conf) } + val logger = + if (codecName.isDefined) { + new FileLogger(logDirPathString, conf, compress = true) + } else { + new FileLogger(logDirPathString, conf) + } + logger.start() + assert(fileSystem.exists(logDirPath)) + assert(fileSystem.getFileStatus(logDirPath).isDir) + assert(fileSystem.listStatus(logDirPath).size === 0) + + logger.newFile() + val files = fileSystem.listStatus(logDirPath) + assert(files.size === 1) + val firstFile = files.head + val firstFilePath = firstFile.getPath + + logger.log("hello") + logger.flush() + assert(readFileContent(firstFilePath, codec) === "hello") + + logger.log(" world") + logger.close() + assert(readFileContent(firstFilePath, codec) === "hello world") + } + + /** + * Test logging to multiple files. + */ + private def testMultipleFiles(codecName: Option[String] = None) { + val conf = getLoggingConf(codecName) + val codec = codecName.map { c => CompressionCodec.createCodec(conf) } + val logger = + if (codecName.isDefined) { + new FileLogger(logDirPathString, conf, compress = true) + } else { + new FileLogger(logDirPathString, conf) + } + logger.start() + logger.newFile("Jean_Valjean") + logger.logLine("Who am I?") + logger.logLine("Destiny?") + logger.newFile("John_Valjohn") + logger.logLine("One") + logger.logLine("Two three...") + logger.newFile("Wolverine") + logger.logLine("There was a time") + logger.logLine("A time when our enemies knew honor.") + logger.close() + assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?") + assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...") + assert(readFileContent(new Path(logDirPath, "Wolverine"), codec) === + "There was a time\nA time when our enemies knew honor.") + } + + /** + * Read the content of the file specified by the given path. + * If a compression codec is specified, use it to read the file. + */ + private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = { + val fstream = fileSystem.open(logPath) + val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream) + Source.fromInputStream(cstream).getLines().mkString("\n") + } + + private def getLoggingConf(codecName: Option[String]) = { + val conf = new SparkConf + codecName.foreach { c => conf.set("spark.io.compression.codec", c) } + conf + } + +} -- GitLab