Skip to content
Snippets Groups Projects
Commit f5ae38af authored by Reynold Xin's avatar Reynold Xin Committed by Matei Zaharia
Browse files

SPARK-1158: Fix flaky RateLimitedOutputStreamSuite.

There was actually a problem with the RateLimitedOutputStream implementation where the first second doesn't write anything because of integer rounding.

So RateLimitedOutputStream was overly aggressive in throttling.

Author: Reynold Xin <rxin@apache.org>

Closes #55 from rxin/ratelimitest and squashes the following commits:

52ce1b7 [Reynold Xin] SPARK-1158: Fix flaky RateLimitedOutputStreamSuite.
parent 923dba50
No related branches found
No related tags found
No related merge requests found
...@@ -22,12 +22,20 @@ import scala.annotation.tailrec ...@@ -22,12 +22,20 @@ import scala.annotation.tailrec
import java.io.OutputStream import java.io.OutputStream
import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeUnit._
import org.apache.spark.Logging
private[streaming] private[streaming]
class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream { class RateLimitedOutputStream(out: OutputStream, desiredBytesPerSec: Int)
val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS) extends OutputStream
val CHUNK_SIZE = 8192 with Logging {
var lastSyncTime = System.nanoTime
var bytesWrittenSinceSync: Long = 0 require(desiredBytesPerSec > 0)
private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
private val CHUNK_SIZE = 8192
private var lastSyncTime = System.nanoTime
private var bytesWrittenSinceSync = 0L
override def write(b: Int) { override def write(b: Int) {
waitToWrite(1) waitToWrite(1)
...@@ -59,9 +67,9 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu ...@@ -59,9 +67,9 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
@tailrec @tailrec
private def waitToWrite(numBytes: Int) { private def waitToWrite(numBytes: Int) {
val now = System.nanoTime val now = System.nanoTime
val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS) val elapsedNanosecs = math.max(now - lastSyncTime, 1)
val rate = bytesWrittenSinceSync.toDouble / elapsedSecs val rate = bytesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
if (rate < bytesPerSec) { if (rate < desiredBytesPerSec) {
// It's okay to write; just update some variables and return // It's okay to write; just update some variables and return
bytesWrittenSinceSync += numBytes bytesWrittenSinceSync += numBytes
if (now > lastSyncTime + SYNC_INTERVAL) { if (now > lastSyncTime + SYNC_INTERVAL) {
...@@ -71,13 +79,14 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu ...@@ -71,13 +79,14 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
} }
} else { } else {
// Calculate how much time we should sleep to bring ourselves to the desired rate. // Calculate how much time we should sleep to bring ourselves to the desired rate.
// Based on throttler in Kafka val targetTimeInMillis = bytesWrittenSinceSync * 1000 / desiredBytesPerSec
// scalastyle:off val elapsedTimeInMillis = elapsedNanosecs / 1000000
// (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
// scalastyle:on if (sleepTimeInMillis > 0) {
val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), logTrace("Natural rate is " + rate + " per second but desired rate is " +
SECONDS) desiredBytesPerSec + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
if (sleepTime > 0) Thread.sleep(sleepTime) Thread.sleep(sleepTimeInMillis)
}
waitToWrite(numBytes) waitToWrite(numBytes)
} }
} }
......
...@@ -17,10 +17,11 @@ ...@@ -17,10 +17,11 @@
package org.apache.spark.streaming.util package org.apache.spark.streaming.util
import org.scalatest.FunSuite
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeUnit._
import org.scalatest.FunSuite
class RateLimitedOutputStreamSuite extends FunSuite { class RateLimitedOutputStreamSuite extends FunSuite {
private def benchmark[U](f: => U): Long = { private def benchmark[U](f: => U): Long = {
...@@ -29,12 +30,14 @@ class RateLimitedOutputStreamSuite extends FunSuite { ...@@ -29,12 +30,14 @@ class RateLimitedOutputStreamSuite extends FunSuite {
System.nanoTime - start System.nanoTime - start
} }
ignore("write") { test("write") {
val underlying = new ByteArrayOutputStream val underlying = new ByteArrayOutputStream
val data = "X" * 41000 val data = "X" * 41000
val stream = new RateLimitedOutputStream(underlying, 10000) val stream = new RateLimitedOutputStream(underlying, desiredBytesPerSec = 10000)
val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) } val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) }
assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4)
assert(underlying.toString("UTF-8") == data) // We accept anywhere from 4.0 to 4.99999 seconds since the value is rounded down.
assert(SECONDS.convert(elapsedNs, NANOSECONDS) === 4)
assert(underlying.toString("UTF-8") === data)
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment