diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index cbac4c13ca6fed29fa7a5ec5739c39a0bba32c95..dec3316bf7745912ee21c9d349176cea5af523dc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
 
 import org.apache.spark.Logging
 
@@ -36,16 +36,24 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
   private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
   private var queueFullErrorMessageLogged = false
   private var started = false
+
+  // A counter that represents the number of events produced and consumed in the queue
+  private val eventLock = new Semaphore(0)
+
   private val listenerThread = new Thread("SparkListenerBus") {
     setDaemon(true)
     override def run() {
       while (true) {
-        val event = eventQueue.take
-        if (event == SparkListenerShutdown) {
-          // Get out of the while loop and shutdown the daemon thread
-          return
+        eventLock.acquire()
+        // Atomically remove and process this event
+        LiveListenerBus.this.synchronized {
+          val event = eventQueue.poll
+          if (event == SparkListenerShutdown) {
+            // Get out of the while loop and shutdown the daemon thread
+            return
+          }
+          Option(event).foreach(postToAll)
         }
-        postToAll(event)
       }
     }
   }
@@ -67,7 +75,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
 
   def post(event: SparkListenerEvent) {
     val eventAdded = eventQueue.offer(event)
-    if (!eventAdded && !queueFullErrorMessageLogged) {
+    if (eventAdded) {
+      eventLock.release()
+    } else if (!queueFullErrorMessageLogged) {
       logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
         "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
         "rate at which tasks are being started by the scheduler.")
@@ -76,13 +86,13 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
   }
 
   /**
-   * Waits until there are no more events in the queue, or until the specified time has elapsed.
-   * Used for testing only. Returns true if the queue has emptied and false is the specified time
+   * For testing only. Wait until there are no more events in the queue, or until the specified
+   * time has elapsed. Return true if the queue has emptied and false is the specified time
    * elapsed before the queue emptied.
    */
   def waitUntilEmpty(timeoutMillis: Int): Boolean = {
     val finishTime = System.currentTimeMillis + timeoutMillis
-    while (!eventQueue.isEmpty) {
+    while (!queueIsEmpty) {
       if (System.currentTimeMillis > finishTime) {
         return false
       }
@@ -93,6 +103,14 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
     true
   }
 
+  /**
+   * Return whether the event queue is empty.
+   *
+   * The use of synchronized here guarantees that all events that once belonged to this queue
+   * have already been processed by all attached listeners, if this returns true.
+   */
+  def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
+
   def stop() {
     if (!started) {
       throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index ba048ced32a934b8eb6d26cdab61a9fac2a0b2da..4e9fd07e68a21571f62a2fad8c400f405ec4ee23 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -29,7 +29,8 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.executor.TaskMetrics
 
 class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
-    with BeforeAndAfter with BeforeAndAfterAll {
+  with BeforeAndAfter with BeforeAndAfterAll {
+
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
 
@@ -37,7 +38,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
     sc = new SparkContext("local", "SparkListenerSuite")
   }
 
-  override def afterAll {
+  override def afterAll() {
     System.clearProperty("spark.akka.frameSize")
   }