Skip to content
Snippets Groups Projects
Commit f22954ad authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-18257][SS] Improve error reporting for FileStressSuite

## What changes were proposed in this pull request?
This patch improves error reporting for FileStressSuite, when there is an error in Spark itself (not user code). This works by simply tightening the exception verification, and gets rid of the unnecessary thread for starting the stream.

Also renamed the class FileStreamStressSuite to make it more obvious it is a streaming suite.

## How was this patch tested?
This is a test only change and I manually verified error reporting by injecting some bug in the addBatch code for FileStreamSink.

Author: Reynold Xin <rxin@databricks.com>

Closes #15757 from rxin/SPARK-18257.
parent e8920252
No related branches found
No related tags found
No related merge requests found
......@@ -36,9 +36,12 @@ import org.apache.spark.util.Utils
*
* At the end, the resulting files are loaded and the answer is checked.
*/
class FileStressSuite extends StreamTest {
class FileStreamStressSuite extends StreamTest {
import testImplicits._
// Error message thrown in the streaming job for testing recovery.
private val injectedErrorMsg = "test suite injected failure!"
testQuietly("fault tolerance stress test - unpartitioned output") {
stressTest(partitionWrites = false)
}
......@@ -101,13 +104,14 @@ class FileStressSuite extends StreamTest {
val input = spark.readStream.format("text").load(inputDir)
def startStream(): StreamingQuery = {
val errorMsg = injectedErrorMsg // work around serialization issue
val output = input
.repartition(5)
.as[String]
.mapPartitions { iter =>
val rand = Random.nextInt(100)
if (rand < 10) {
sys.error("failure")
sys.error(errorMsg)
}
iter.map(_.toLong)
}
......@@ -131,22 +135,21 @@ class FileStressSuite extends StreamTest {
}
var failures = 0
val streamThread = new Thread("stream runner") {
while (continue) {
if (failures % 10 == 0) { logError(s"Query restart #$failures") }
stream = startStream()
try {
stream.awaitTermination()
} catch {
case ce: StreamingQueryException =>
failures += 1
}
while (continue) {
if (failures % 10 == 0) { logError(s"Query restart #$failures") }
stream = startStream()
try {
stream.awaitTermination()
} catch {
case e: StreamingQueryException
if e.getCause != null && e.getCause.getCause != null &&
e.getCause.getCause.getMessage.contains(injectedErrorMsg) =>
// Getting the expected error message
failures += 1
}
}
streamThread.join()
logError(s"Stream restarted $failures times.")
assert(spark.read.parquet(outputDir).distinct().count() == numRecords)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment