diff --git a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 868df64e8c94449ad5cd39d1c85f9ba68b636fd7..98fe38e826afb6caca440ea0429b32402821fdf2 100644
--- a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -122,14 +122,23 @@ public class JavaKafkaStreamSuite implements Serializable {
     ssc.start();
 
     long startTime = System.currentTimeMillis();
-    boolean sizeMatches = false;
-    while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
-      sizeMatches = sent.size() == result.size();
+    AssertionError lastError = null;
+    while (System.currentTimeMillis() - startTime < 20000) {
+      try {
+        Assert.assertEquals(sent.size(), result.size());
+        for (Map.Entry<String, Integer> e : sent.entrySet()) {
+          Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
+        }
+        return;
+      } catch (AssertionError e) {
+        lastError = e;
+      }
       Thread.sleep(200);
     }
-    Assert.assertEquals(sent.size(), result.size());
-    for (Map.Entry<String, Integer> e : sent.entrySet()) {
-      Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
+    if (lastError != null) {
+      throw lastError;
+    } else {
+      Assert.fail("timeout");
     }
   }
 }