diff --git a/docs/configuration.md b/docs/configuration.md
index b84104cc7e6535b0ee5101984a59a5251939a475..0aea23ab595021292d503791afbacae7baf41612 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -773,6 +773,15 @@ Apart from these, the following properties are also available, and may be useful
     into blocks of data before storing them in Spark.
   </td>
 </tr>
+<tr>
+  <td><code>spark.streaming.receiver.maxRate</code></td>
+  <td>infinite</td>
+  <td>
+    Maximum rate (per second) at which each receiver will push data into blocks. Effectively,
+    each stream will consume at most this number of records per second.
+    Setting this configuration to 0 or a negative number will put no limit on the rate.
+  </td>
+</tr>
 <tr>
   <td><code>spark.streaming.unpersist</code></td>
   <td>true</td>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 78cc2daa56e53eca42f3bc3c0b64117925ae2b14..0316b6862f1954faf7548c604f0fdd8c73166390 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -44,7 +44,7 @@ private[streaming] class BlockGenerator(
     listener: BlockGeneratorListener,
     receiverId: Int,
     conf: SparkConf
-  ) extends Logging {
+  ) extends RateLimiter(conf) with Logging {
 
   private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
 
@@ -81,6 +81,7 @@ private[streaming] class BlockGenerator(
    * will be periodically pushed into BlockManager.
    */
   def += (data: Any): Unit = synchronized {
+    waitToPush()
     currentBuffer += data
   }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
new file mode 100644
index 0000000000000000000000000000000000000000..e4f6ba626ebbf3ce0c39e2a72d13aa52b4a07b29
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import org.apache.spark.{Logging, SparkConf}
+import java.util.concurrent.TimeUnit._
+
+/** Provides waitToPush() method to limit the rate at which receivers consume data.
+  *
+  * waitToPush method will block the thread if too many messages have been pushed too quickly,
+  * and only return when a new message has been pushed. It assumes that only one message is
+  * pushed at a time.
+  *
+  * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
+  * per second that each receiver will accept.
+  *
+  * @param conf spark configuration
+  */
+private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
+
+  private var lastSyncTime = System.nanoTime
+  private var messagesWrittenSinceSync = 0L
+  private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
+  private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
+
+  def waitToPush() {
+    if( desiredRate <= 0 ) {
+      return
+    }
+    val now = System.nanoTime
+    val elapsedNanosecs = math.max(now - lastSyncTime, 1)
+    val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
+    if (rate < desiredRate) {
+      // It's okay to write; just update some variables and return
+      messagesWrittenSinceSync += 1
+      if (now > lastSyncTime + SYNC_INTERVAL) {
+        // Sync interval has passed; let's resync
+        lastSyncTime = now
+        messagesWrittenSinceSync = 1
+      }
+    } else {
+      // Calculate how much time we should sleep to bring ourselves to the desired rate.
+      val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
+      val elapsedTimeInMillis = elapsedNanosecs / 1000000
+      val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
+      if (sleepTimeInMillis > 0) {
+        logTrace("Natural rate is " + rate + " per second but desired rate is " +
+          desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
+        Thread.sleep(sleepTimeInMillis)
+      }
+      waitToPush()
+    }
+  }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index d9ac3c91f6e36daacdc8564154a01e2b6a9be31a..f4e11f975de94e7966583d3b3e8f05234ede7cf4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -145,6 +145,44 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
     assert(recordedData.toSet === generatedData.toSet)
   }
 
+  test("block generator throttling") {
+    val blockGeneratorListener = new FakeBlockGeneratorListener
+    val blockInterval = 50
+    val maxRate = 200
+    val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
+      set("spark.streaming.receiver.maxRate", maxRate.toString)
+    val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
+    val expectedBlocks = 20
+    val waitTime = expectedBlocks * blockInterval
+    val expectedMessages = maxRate * waitTime / 1000
+    val expectedMessagesPerBlock = maxRate * blockInterval / 1000
+    val generatedData = new ArrayBuffer[Int]
+
+    // Generate blocks
+    val startTime = System.currentTimeMillis()
+    blockGenerator.start()
+    var count = 0
+    while(System.currentTimeMillis - startTime < waitTime) {
+      blockGenerator += count
+      generatedData += count
+      count += 1
+      Thread.sleep(1)
+    }
+    blockGenerator.stop()
+
+    val recordedData = blockGeneratorListener.arrayBuffers
+    assert(blockGeneratorListener.arrayBuffers.size > 0)
+    assert(recordedData.flatten.toSet === generatedData.toSet)
+    // recordedData size should be close to the expected rate
+    assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
+      recordedData.flatten.size <= expectedMessages * 1.1 )
+    // the first and last block may be incomplete, so we slice them out
+    recordedData.slice(1, recordedData.size - 1).foreach { block =>
+      assert(block.size >= expectedMessagesPerBlock * 0.8 &&
+        block.size <= expectedMessagesPerBlock * 1.2 )
+    }
+  }
+
   /**
    * An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
    */