From b10837ab1a7bef04bf7a2773b9e44ed9206643fe Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun <dongjoon@apache.org> Date: Mon, 20 Nov 2017 13:32:01 +0900 Subject: [PATCH] [SPARK-22557][TEST] Use ThreadSignaler explicitly ## What changes were proposed in this pull request? ScalaTest 3.0 uses an implicit `Signaler`. This PR makes it sure all Spark tests uses `ThreadSignaler` explicitly which has the same default behavior of interrupting a thread on the JVM like ScalaTest 2.2.x. This will reduce potential flakiness. ## How was this patch tested? This is testsuite-only update. This should passes the Jenkins tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #19784 from dongjoon-hyun/use_thread_signaler. --- .../test/scala/org/apache/spark/DistributedSuite.scala | 7 +++++-- core/src/test/scala/org/apache/spark/DriverSuite.scala | 5 ++++- .../test/scala/org/apache/spark/UnpersistSuite.scala | 8 ++++++-- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 9 ++++++++- .../org/apache/spark/rdd/AsyncRDDActionsSuite.scala | 5 ++++- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 ++++- .../OutputCommitCoordinatorIntegrationSuite.scala | 5 ++++- .../org/apache/spark/storage/BlockManagerSuite.scala | 10 ++++++++-- .../scala/org/apache/spark/util/EventLoopSuite.scala | 5 ++++- .../streaming/ProcessingTimeExecutorSuite.scala | 8 +++++--- .../org/apache/spark/sql/streaming/StreamTest.scala | 2 ++ .../apache/spark/sql/hive/SparkSubmitTestUtils.scala | 5 ++++- .../org/apache/spark/streaming/ReceiverSuite.scala | 5 +++-- .../apache/spark/streaming/StreamingContextSuite.scala | 5 +++-- .../spark/streaming/receiver/BlockGeneratorSuite.scala | 7 ++++--- 15 files changed, 68 insertions(+), 23 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index f8005610f7..ea9f6d2fc2 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark import org.scalatest.Matchers -import org.scalatest.concurrent.TimeLimits._ +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.{Millis, Span} import org.apache.spark.security.EncryptionFunSuite @@ -30,7 +30,10 @@ class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() { class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContext - with EncryptionFunSuite { + with EncryptionFunSuite with TimeLimits { + + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler val clusterUrl = "local-cluster[2,1,1024]" diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index be80d278fc..962945e5b6 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark import java.io.File -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.prop.TableDrivenPropertyChecks._ import org.scalatest.time.SpanSugar._ @@ -27,6 +27,9 @@ import org.apache.spark.util.Utils class DriverSuite extends SparkFunSuite with TimeLimits { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + ignore("driver should exit after finishing without cleanup (SPARK-530)") { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val masters = Table("master", "local", "local-cluster[2,1,1024]") diff --git a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala index bc3f58cf2a..b58a3ebe6e 100644 --- a/core/src/test/scala/org/apache/spark/UnpersistSuite.scala +++ b/core/src/test/scala/org/apache/spark/UnpersistSuite.scala @@ -17,10 +17,14 @@ package org.apache.spark -import org.scalatest.concurrent.TimeLimits._ +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.{Millis, Span} -class UnpersistSuite extends SparkFunSuite with LocalSparkContext { +class UnpersistSuite extends SparkFunSuite with LocalSparkContext with TimeLimits { + + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + test("unpersist RDD") { sc = new SparkContext("local", "test") val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index cfbf56fb8c..d0a34c5cdc 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -31,7 +31,7 @@ import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path} import org.scalatest.{BeforeAndAfterEach, Matchers} -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.SpanSugar._ import org.apache.spark._ @@ -102,6 +102,9 @@ class SparkSubmitSuite import SparkSubmitSuite._ + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + override def beforeEach() { super.beforeEach() System.setProperty("spark.testing", "true") @@ -1016,6 +1019,10 @@ class SparkSubmitSuite } object SparkSubmitSuite extends SparkFunSuite with TimeLimits { + + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. def runSparkSubmit(args: Seq[String], root: String = ".."): Unit = { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index de0e71a332..24b0144a38 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.SpanSugar._ import org.apache.spark._ @@ -34,6 +34,9 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim @transient private var sc: SparkContext = _ + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + override def beforeAll() { super.beforeAll() sc = new SparkContext("local[2]", "test") diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6222e576d1..d395e09969 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.SpanSugar._ import org.apache.spark._ @@ -102,6 +102,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi import DAGSchedulerSuite._ + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + val conf = new SparkConf /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala index a27dadcf49..d6ff5bb330 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext} -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.{Seconds, Span} import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext} @@ -34,6 +34,9 @@ class OutputCommitCoordinatorIntegrationSuite with LocalSparkContext with TimeLimits { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + override def beforeAll(): Unit = { super.beforeAll() val conf = new SparkConf() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d45c194d31..f3e8a2ed1d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -31,8 +31,8 @@ import org.apache.commons.lang3.RandomUtils import org.mockito.{Matchers => mc} import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest._ +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.TimeLimits._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager @@ -57,10 +57,13 @@ import org.apache.spark.util.io.ChunkedByteBuffer class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach with PrivateMethodTester with LocalSparkContext with ResetSystemProperties - with EncryptionFunSuite { + with EncryptionFunSuite with TimeLimits { import BlockManagerSuite._ + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + var conf: SparkConf = null var store: BlockManager = null var store2: BlockManager = null @@ -1450,6 +1453,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE private object BlockManagerSuite { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + private implicit class BlockManagerTestUtils(store: BlockManager) { def dropFromMemoryIfExists( diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala index f4f8388f5f..5507457717 100644 --- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala @@ -23,13 +23,16 @@ import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.TimeLimits import org.apache.spark.SparkFunSuite class EventLoopSuite extends SparkFunSuite with TimeLimits { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + test("EventLoop") { val buffer = new ConcurrentLinkedQueue[Int] val eventLoop = new EventLoop[Int]("test") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index 519e3c01af..80c76915e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -22,16 +22,18 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import org.eclipse.jetty.util.ConcurrentHashSet -import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.{Eventually, Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.scalatest.concurrent.TimeLimits._ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.sql.streaming.util.StreamManualClock -class ProcessingTimeExecutorSuite extends SparkFunSuite { +class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits { + + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler val timeout = 10.seconds diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 70b39b9340..e68fca0505 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -69,7 +69,9 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} */ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with BeforeAndAfterAll { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler + override def afterAll(): Unit = { super.afterAll() StateStore.stop() // stop the state store maintenance thread and unload store providers diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala index ede44df4af..68ed97d6d1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala @@ -23,7 +23,7 @@ import java.util.Date import scala.collection.mutable.ArrayBuffer -import org.scalatest.concurrent.TimeLimits +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ @@ -33,6 +33,9 @@ import org.apache.spark.util.Utils trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val defaultSignaler: Signaler = ThreadSignaler + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. // This is copied from org.apache.spark.deploy.SparkSubmitSuite protected def runSparkSubmit(args: Seq[String], sparkHomeOpt: Option[String] = None): Unit = { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 5fc626c1f7..145c48e5a9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -38,6 +38,9 @@ import org.apache.spark.util.Utils /** Testsuite for testing the network receiver behavior */ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val signaler: Signaler = ThreadSignaler + test("receiver life cycle") { val receiver = new FakeReceiver @@ -60,8 +63,6 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { // Verify that the receiver intercept[Exception] { - // Necessary to make failAfter interrupt awaitTermination() in ScalaTest 3.x - implicit val signaler: Signaler = ThreadSignaler failAfter(200 millis) { executingThread.join() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 5810e73f40..52c8959351 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -44,6 +44,9 @@ import org.apache.spark.util.Utils class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits with Logging { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x + implicit val signaler: Signaler = ThreadSignaler + val master = "local[2]" val appName = this.getClass.getSimpleName val batchDuration = Milliseconds(500) @@ -406,8 +409,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL // test whether awaitTermination() does not exit if not time is given val exception = intercept[Exception] { - // Necessary to make failAfter interrupt awaitTermination() in ScalaTest 3.x - implicit val signaler: Signaler = ThreadSignaler failAfter(1000 millis) { ssc.awaitTermination() throw new Exception("Did not wait for stop") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala index 898da4445e..580f831548 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala @@ -24,18 +24,19 @@ import scala.collection.mutable import org.scalatest.BeforeAndAfter import org.scalatest.Matchers._ -import org.scalatest.concurrent.{Signaler, ThreadSignaler} +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.TimeLimits._ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.storage.StreamBlockId import org.apache.spark.util.ManualClock -class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { +class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits { + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler + private val blockIntervalMs = 10 private val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms") @volatile private var blockGenerator: BlockGenerator = null -- GitLab