Skip to content
Snippets Groups Projects
Commit 38877ccf authored by Kan Zhang's avatar Kan Zhang Committed by Reynold Xin
Browse files

Fixing a race condition in event listener unit test

Author: Kan Zhang <kzhang@apache.org>

Closes #401 from kanzhang/fix-1475 and squashes the following commits:

c6058bd [Kan Zhang] Fixing a race condition in event listener unit test
parent 016a8776
No related branches found
No related tags found
No related merge requests found
...@@ -50,9 +50,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { ...@@ -50,9 +50,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
} }
} }
// Exposed for testing
@volatile private[spark] var stopCalled = false
/** /**
* Start sending events to attached listeners. * Start sending events to attached listeners.
* *
...@@ -97,7 +94,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { ...@@ -97,7 +94,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
} }
def stop() { def stop() {
stopCalled = true
if (!started) { if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
} }
......
...@@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc ...@@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
test("bus.stop() waits for the event queue to completely drain") { test("bus.stop() waits for the event queue to completely drain") {
@volatile var drained = false @volatile var drained = false
// When Listener has started
val listenerStarted = new Semaphore(0)
// Tells the listener to stop blocking // Tells the listener to stop blocking
val listenerWait = new Semaphore(1) val listenerWait = new Semaphore(0)
// When stopper has started
val stopperStarted = new Semaphore(0)
// When stop has returned // When stopper has returned
val stopReturned = new Semaphore(1) val stopperReturned = new Semaphore(0)
class BlockingListener extends SparkListener { class BlockingListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd) = { override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
listenerStarted.release()
listenerWait.acquire() listenerWait.acquire()
drained = true drained = true
} }
...@@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc ...@@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
bus.start() bus.start()
bus.post(SparkListenerJobEnd(0, JobSucceeded)) bus.post(SparkListenerJobEnd(0, JobSucceeded))
// the queue should not drain immediately listenerStarted.acquire()
// Listener should be blocked after start
assert(!drained) assert(!drained)
new Thread("ListenerBusStopper") { new Thread("ListenerBusStopper") {
override def run() { override def run() {
stopperStarted.release()
// stop() will block until notify() is called below // stop() will block until notify() is called below
bus.stop() bus.stop()
stopReturned.release(1) stopperReturned.release()
} }
}.start() }.start()
while (!bus.stopCalled) { stopperStarted.acquire()
Thread.sleep(10) // Listener should remain blocked after stopper started
} assert(!drained)
// unblock Listener to let queue drain
listenerWait.release() listenerWait.release()
stopReturned.acquire() stopperReturned.acquire()
assert(drained) assert(drained)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment