diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 27d32b5dca4315c8a446ae889dc6f464c79210da..0c5f3f22e31e8cfcbf4221feacf4d1fdc8ce3d11 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -5,3 +5,4 @@ org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 org.apache.spark.sql.execution.datasources.text.TextFileFormat
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
 org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
+org.apache.spark.sql.execution.streaming.RateSourceProvider
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
new file mode 100644
index 0000000000000000000000000000000000000000..e61a8eb628891bb7701e17d02586d7cec6ccfe01
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
+
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ManualClock, SystemClock}
+
+/**
+ *  A source that generates increment long values with timestamps. Each generated row has two
+ *  columns: a timestamp column for the generated time and an auto increment long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second.
+ *  - `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed
+ *    becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer
+ *    seconds.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the
+ *    generated rows. The source will try its best to reach `rowsPerSecond`, but the query may
+ *    be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed.
+ */
+class RateSourceProvider extends StreamSourceProvider with DataSourceRegister {
+
+  override def sourceSchema(
+      sqlContext: SQLContext,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): (String, StructType) =
+    (shortName(), RateSourceProvider.SCHEMA)
+
+  override def createSource(
+      sqlContext: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    val params = CaseInsensitiveMap(parameters)
+
+    val rowsPerSecond = params.get("rowsPerSecond").map(_.toLong).getOrElse(1L)
+    if (rowsPerSecond <= 0) {
+      throw new IllegalArgumentException(
+        s"Invalid value '${params("rowsPerSecond")}'. The option 'rowsPerSecond' " +
+          "must be positive")
+    }
+
+    val rampUpTimeSeconds =
+      params.get("rampUpTime").map(JavaUtils.timeStringAsSec(_)).getOrElse(0L)
+    if (rampUpTimeSeconds < 0) {
+      throw new IllegalArgumentException(
+        s"Invalid value '${params("rampUpTime")}'. The option 'rampUpTime' " +
+          "must not be negative")
+    }
+
+    val numPartitions = params.get("numPartitions").map(_.toInt).getOrElse(
+      sqlContext.sparkContext.defaultParallelism)
+    if (numPartitions <= 0) {
+      throw new IllegalArgumentException(
+        s"Invalid value '${params("numPartitions")}'. The option 'numPartitions' " +
+          "must be positive")
+    }
+
+    new RateStreamSource(
+      sqlContext,
+      metadataPath,
+      rowsPerSecond,
+      rampUpTimeSeconds,
+      numPartitions,
+      params.get("useManualClock").map(_.toBoolean).getOrElse(false) // Only for testing
+    )
+  }
+  override def shortName(): String = "rate"
+}
+
+object RateSourceProvider {
+  val SCHEMA =
+    StructType(StructField("timestamp", TimestampType) :: StructField("value", LongType) :: Nil)
+
+  val VERSION = 1
+}
+
+class RateStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    rowsPerSecond: Long,
+    rampUpTimeSeconds: Long,
+    numPartitions: Int,
+    useManualClock: Boolean) extends Source with Logging {
+
+  import RateSourceProvider._
+  import RateStreamSource._
+
+  val clock = if (useManualClock) new ManualClock else new SystemClock
+
+  private val maxSeconds = Long.MaxValue / rowsPerSecond
+
+  if (rampUpTimeSeconds > maxSeconds) {
+    throw new ArithmeticException(
+      s"Integer overflow. Max offset with $rowsPerSecond rowsPerSecond" +
+        s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
+  }
+
+  private val startTimeMs = {
+    val metadataLog =
+      new HDFSMetadataLog[LongOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: LongOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush
+        }
+
+        override def deserialize(in: InputStream): LongOffset = {
+          val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
+          // HDFSMetadataLog guarantees that it never creates a partial file.
+          assert(content.length != 0)
+          if (content(0) == 'v') {
+            val indexOfNewLine = content.indexOf("\n")
+            if (indexOfNewLine > 0) {
+              val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+              LongOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
+            } else {
+              throw new IllegalStateException(
+                s"Log file was malformed: failed to detect the log file version line.")
+            }
+          } else {
+            throw new IllegalStateException(
+              s"Log file was malformed: failed to detect the log file version line.")
+          }
+        }
+      }
+
+    metadataLog.get(0).getOrElse {
+      val offset = LongOffset(clock.getTimeMillis())
+      metadataLog.add(0, offset)
+      logInfo(s"Start time: $offset")
+      offset
+    }.offset
+  }
+
+  /** When the system time runs backward, "lastTimeMs" will make sure we are still monotonic. */
+  @volatile private var lastTimeMs = startTimeMs
+
+  override def schema: StructType = RateSourceProvider.SCHEMA
+
+  override def getOffset: Option[Offset] = {
+    val now = clock.getTimeMillis()
+    if (lastTimeMs < now) {
+      lastTimeMs = now
+    }
+    Some(LongOffset(TimeUnit.MILLISECONDS.toSeconds(lastTimeMs - startTimeMs)))
+  }
+
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    val startSeconds = start.flatMap(LongOffset.convert(_).map(_.offset)).getOrElse(0L)
+    val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L)
+    assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)")
+    if (endSeconds > maxSeconds) {
+      throw new ArithmeticException("Integer overflow. Max offset with " +
+        s"$rowsPerSecond rowsPerSecond is $maxSeconds, but it's $endSeconds now.")
+    }
+    // Fix "lastTimeMs" for recovery
+    if (lastTimeMs < TimeUnit.SECONDS.toMillis(endSeconds) + startTimeMs) {
+      lastTimeMs = TimeUnit.SECONDS.toMillis(endSeconds) + startTimeMs
+    }
+    val rangeStart = valueAtSecond(startSeconds, rowsPerSecond, rampUpTimeSeconds)
+    val rangeEnd = valueAtSecond(endSeconds, rowsPerSecond, rampUpTimeSeconds)
+    logDebug(s"startSeconds: $startSeconds, endSeconds: $endSeconds, " +
+      s"rangeStart: $rangeStart, rangeEnd: $rangeEnd")
+
+    if (rangeStart == rangeEnd) {
+      return sqlContext.internalCreateDataFrame(sqlContext.sparkContext.emptyRDD, schema)
+    }
+
+    val localStartTimeMs = startTimeMs + TimeUnit.SECONDS.toMillis(startSeconds)
+    val relativeMsPerValue =
+      TimeUnit.SECONDS.toMillis(endSeconds - startSeconds).toDouble / (rangeEnd - rangeStart)
+
+    val rdd = sqlContext.sparkContext.range(rangeStart, rangeEnd, 1, numPartitions).map { v =>
+      val relative = math.round((v - rangeStart) * relativeMsPerValue)
+      InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), v)
+    }
+    sqlContext.internalCreateDataFrame(rdd, schema)
+  }
+
+  override def stop(): Unit = {}
+
+  override def toString: String = s"RateSource[rowsPerSecond=$rowsPerSecond, " +
+    s"rampUpTimeSeconds=$rampUpTimeSeconds, numPartitions=$numPartitions]"
+}
+
+object RateStreamSource {
+
+  /** Calculate the end value we will emit at the time `seconds`. */
+  def valueAtSecond(seconds: Long, rowsPerSecond: Long, rampUpTimeSeconds: Long): Long = {
+    // E.g., rampUpTimeSeconds = 4, rowsPerSecond = 10
+    // Then speedDeltaPerSecond = 2
+    //
+    // seconds   = 0 1 2  3  4  5  6
+    // speed     = 0 2 4  6  8 10 10 (speedDeltaPerSecond * seconds)
+    // end value = 0 2 6 12 20 30 40 (0 + speedDeltaPerSecond * seconds) * (seconds + 1) / 2
+    val speedDeltaPerSecond = rowsPerSecond / (rampUpTimeSeconds + 1)
+    if (seconds <= rampUpTimeSeconds) {
+      // Calculate "(0 + speedDeltaPerSecond * seconds) * (seconds + 1) / 2" in a special way to
+      // avoid overflow
+      if (seconds % 2 == 1) {
+        (seconds + 1) / 2 * speedDeltaPerSecond * seconds
+      } else {
+        seconds / 2 * speedDeltaPerSecond * (seconds + 1)
+      }
+    } else {
+      // rampUpPart is just a special case of the above formula: rampUpTimeSeconds == seconds
+      val rampUpPart = valueAtSecond(rampUpTimeSeconds, rowsPerSecond, rampUpTimeSeconds)
+      rampUpPart + (seconds - rampUpTimeSeconds) * rowsPerSecond
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..bdba536425a431a8b3cf2223cf4e5bde2b22ee36
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
+import org.apache.spark.util.ManualClock
+
+class RateSourceSuite extends StreamTest {
+
+  import testImplicits._
+
+  case class AdvanceRateManualClock(seconds: Long) extends AddData {
+    override def addData(query: Option[StreamExecution]): (Source, Offset) = {
+      assert(query.nonEmpty)
+      val rateSource = query.get.logicalPlan.collect {
+        case StreamingExecutionRelation(source, _) if source.isInstanceOf[RateStreamSource] =>
+          source.asInstanceOf[RateStreamSource]
+      }.head
+      rateSource.clock.asInstanceOf[ManualClock].advance(TimeUnit.SECONDS.toMillis(seconds))
+      (rateSource, rateSource.getOffset.get)
+    }
+  }
+
+  test("basic") {
+    val input = spark.readStream
+      .format("rate")
+      .option("rowsPerSecond", "10")
+      .option("useManualClock", "true")
+      .load()
+    testStream(input)(
+      AdvanceRateManualClock(seconds = 1),
+      CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(v * 100L) -> v): _*),
+      StopStream,
+      StartStream(),
+      // Advance 2 seconds because creating a new RateSource will also create a new ManualClock
+      AdvanceRateManualClock(seconds = 2),
+      CheckLastBatch((10 until 20).map(v => new java.sql.Timestamp(v * 100L) -> v): _*)
+    )
+  }
+
+  test("uniform distribution of event timestamps") {
+    val input = spark.readStream
+      .format("rate")
+      .option("rowsPerSecond", "1500")
+      .option("useManualClock", "true")
+      .load()
+      .as[(java.sql.Timestamp, Long)]
+      .map(v => (v._1.getTime, v._2))
+    val expectedAnswer = (0 until 1500).map { v =>
+      (math.round(v * (1000.0 / 1500)), v)
+    }
+    testStream(input)(
+      AdvanceRateManualClock(seconds = 1),
+      CheckLastBatch(expectedAnswer: _*)
+    )
+  }
+
+  test("valueAtSecond") {
+    import RateStreamSource._
+
+    assert(valueAtSecond(seconds = 0, rowsPerSecond = 5, rampUpTimeSeconds = 0) === 0)
+    assert(valueAtSecond(seconds = 1, rowsPerSecond = 5, rampUpTimeSeconds = 0) === 5)
+
+    assert(valueAtSecond(seconds = 0, rowsPerSecond = 5, rampUpTimeSeconds = 2) === 0)
+    assert(valueAtSecond(seconds = 1, rowsPerSecond = 5, rampUpTimeSeconds = 2) === 1)
+    assert(valueAtSecond(seconds = 2, rowsPerSecond = 5, rampUpTimeSeconds = 2) === 3)
+    assert(valueAtSecond(seconds = 3, rowsPerSecond = 5, rampUpTimeSeconds = 2) === 8)
+
+    assert(valueAtSecond(seconds = 0, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 0)
+    assert(valueAtSecond(seconds = 1, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 2)
+    assert(valueAtSecond(seconds = 2, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 6)
+    assert(valueAtSecond(seconds = 3, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 12)
+    assert(valueAtSecond(seconds = 4, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 20)
+    assert(valueAtSecond(seconds = 5, rowsPerSecond = 10, rampUpTimeSeconds = 4) === 30)
+  }
+
+  test("rampUpTime") {
+    val input = spark.readStream
+      .format("rate")
+      .option("rowsPerSecond", "10")
+      .option("rampUpTime", "4s")
+      .option("useManualClock", "true")
+      .load()
+      .as[(java.sql.Timestamp, Long)]
+      .map(v => (v._1.getTime, v._2))
+    testStream(input)(
+      AdvanceRateManualClock(seconds = 1),
+      CheckLastBatch((0 until 2).map(v => v * 500 -> v): _*), // speed = 2
+      AdvanceRateManualClock(seconds = 1),
+      CheckLastBatch((2 until 6).map(v => 1000 + (v - 2) * 250 -> v): _*), // speed = 4
+      AdvanceRateManualClock(seconds = 1),
+      CheckLastBatch({
+        Seq(2000 -> 6, 2167 -> 7, 2333 -> 8, 2500 -> 9, 2667 -> 10, 2833 -> 11)
+      }: _*), // speed = 6
+      AdvanceRateManualClock(seconds = 1),
+      CheckLastBatch((12 until 20).map(v => 3000 + (v - 12) * 125 -> v): _*), // speed = 8
+      AdvanceRateManualClock(seconds = 1),
+      // Now we should reach full speed
+      CheckLastBatch((20 until 30).map(v => 4000 + (v - 20) * 100 -> v): _*), // speed = 10
+      AdvanceRateManualClock(seconds = 1),
+      CheckLastBatch((30 until 40).map(v => 5000 + (v - 30) * 100 -> v): _*), // speed = 10
+      AdvanceRateManualClock(seconds = 1),
+      CheckLastBatch((40 until 50).map(v => 6000 + (v - 40) * 100 -> v): _*) // speed = 10
+    )
+  }
+
+  test("numPartitions") {
+    val input = spark.readStream
+      .format("rate")
+      .option("rowsPerSecond", "10")
+      .option("numPartitions", "6")
+      .option("useManualClock", "true")
+      .load()
+      .select(spark_partition_id())
+      .distinct()
+    testStream(input)(
+      AdvanceRateManualClock(1),
+      CheckLastBatch((0 until 6): _*)
+    )
+  }
+
+  testQuietly("overflow") {
+    val input = spark.readStream
+      .format("rate")
+      .option("rowsPerSecond", Long.MaxValue.toString)
+      .option("useManualClock", "true")
+      .load()
+      .select(spark_partition_id())
+      .distinct()
+    testStream(input)(
+      AdvanceRateManualClock(2),
+      ExpectFailure[ArithmeticException](t => {
+        Seq("overflow", "rowsPerSecond").foreach { msg =>
+          assert(t.getMessage.contains(msg))
+        }
+      })
+    )
+  }
+
+  testQuietly("illegal option values") {
+    def testIllegalOptionValue(
+        option: String,
+        value: String,
+        expectedMessages: Seq[String]): Unit = {
+      val e = intercept[StreamingQueryException] {
+        spark.readStream
+          .format("rate")
+          .option(option, value)
+          .load()
+          .writeStream
+          .format("console")
+          .start()
+          .awaitTermination()
+      }
+      assert(e.getCause.isInstanceOf[IllegalArgumentException])
+      for (msg <- expectedMessages) {
+        assert(e.getCause.getMessage.contains(msg))
+      }
+    }
+
+    testIllegalOptionValue("rowsPerSecond", "-1", Seq("-1", "rowsPerSecond", "positive"))
+    testIllegalOptionValue("numPartitions", "-1", Seq("-1", "numPartitions", "positive"))
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 5bc36dd30f6d1f4d2b5d021b1be27926976b50a6..2a4039cc5831ab3fd4868952e4e5260ab35a1ec2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -172,8 +172,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
    *
    * @param isFatalError if this is a fatal error. If so, the error should also be caught by
    *                     UncaughtExceptionHandler.
+   * @param assertFailure a function to verify the error.
    */
   case class ExpectFailure[T <: Throwable : ClassTag](
+      assertFailure: Throwable => Unit = _ => {},
       isFatalError: Boolean = false) extends StreamAction {
     val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
     override def toString(): String =
@@ -455,6 +457,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
                     s"\tExpected: ${ef.causeClass}\n\tReturned: $streamThreadDeathCause")
                 streamThreadDeathCause = null
               }
+              ef.assertFailure(exception.getCause)
             } catch {
               case _: InterruptedException =>
               case e: org.scalatest.exceptions.TestFailedDueToTimeoutException =>