diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 87bc20f79c3cdec19ee5147481021b9ad4f14bb3..f269cb74e0c2bb269e7d1b8b49a17b6098d8bf52 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -557,6 +557,9 @@ class BasicOperationsSuite extends TestSuiteBase {
     withTestServer(new TestServer()) { testServer =>
       withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
         testServer.start()
+
+        val batchCounter = new BatchCounter(ssc)
+
         // Set up the streaming context and input streams
         val networkStream =
           ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
@@ -587,7 +590,11 @@ class BasicOperationsSuite extends TestSuiteBase {
         for (i <- 0 until input.size) {
           testServer.send(input(i).toString + "\n")
           Thread.sleep(200)
+          val numCompletedBatches = batchCounter.getNumCompletedBatches
           clock.advance(batchDuration.milliseconds)
+          if (!batchCounter.waitUntilBatchesCompleted(numCompletedBatches + 1, 5000)) {
+            fail("Batch took more than 5 seconds to complete")
+          }
           collectRddInfo()
         }