Skip to content
Snippets Groups Projects
Commit 05ac9940 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Adding tests

parent 2fda84fe
No related branches found
No related tags found
No related merge requests found
...@@ -157,19 +157,6 @@ class RDDSuite extends FunSuite with SharedSparkContext { ...@@ -157,19 +157,6 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(partitions2(0).length > 0) assert(partitions2(0).length > 0)
assert(partitions2(19).length > 0) assert(partitions2(19).length > 0)
assert(repartitioned2.collect().toSet === (1 to 1000).toSet) assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
// Coalesce partitions - no shuffle
val repartitioned3 = data.repartition(2, skipShuffle = true)
assert(repartitioned3.partitions.size == 2)
val partitions3 = repartitioned3.glom().collect()
assert(partitions3(0).toList === (1 to 500).toList)
assert(partitions3(1).toList === (501 to 1000).toList)
assert(repartitioned3.collect().toSet === (1 to 1000).toSet)
// Split partitions - no shuffle (should throw exn)
intercept[IllegalArgumentException] {
data.repartition(20, skipShuffle = true)
}
} }
test("coalesced RDDs") { test("coalesced RDDs") {
......
...@@ -82,6 +82,44 @@ class BasicOperationsSuite extends TestSuiteBase { ...@@ -82,6 +82,44 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(input, operation, output, true) testOperation(input, operation, output, true)
} }
test("repartition (more partitions)") {
val input = Seq(1 to 100, 101 to 200, 201 to 300)
val operation = (r: DStream[Int]) => r.repartition(5)
val ssc = setupStreams(input, operation, 2)
val output = runStreamsWithPartitions(ssc, 3, 3)
assert(output.size === 3)
val first = output(0)
val second = output(1)
val third = output(2)
assert(first.size === 5)
assert(second.size === 5)
assert(third.size === 5)
assert(first.flatten.toSet === (1 to 100).toSet)
assert(second.flatten.toSet === (101 to 200).toSet)
assert(third.flatten.toSet === (201 to 300).toSet)
}
test("repartition (fewer partitions)") {
val input = Seq(1 to 100, 101 to 200, 201 to 300)
val operation = (r: DStream[Int]) => r.repartition(2)
val ssc = setupStreams(input, operation, 5)
val output = runStreamsWithPartitions(ssc, 3, 3)
assert(output.size === 3)
val first = output(0)
val second = output(1)
val third = output(2)
assert(first.size === 2)
assert(second.size === 2)
assert(third.size === 2)
assert(first.flatten.toSet === (1 to 100).toSet)
assert(second.flatten.toSet === (101 to 200).toSet)
assert(third.flatten.toSet === (201 to 300).toSet)
}
test("groupByKey") { test("groupByKey") {
testOperation( testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
......
...@@ -60,6 +60,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ ...@@ -60,6 +60,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
/** /**
* This is a output stream just for the testsuites. All the output is collected into a * This is a output stream just for the testsuites. All the output is collected into a
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/ */
class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
...@@ -75,6 +77,27 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu ...@@ -75,6 +77,27 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
} }
} }
/**
* This is a output stream just for the testsuites. All the output is collected into a
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*
* The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
* containing a sequnce of items.
*/
class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
output += collected
}) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
ois.defaultReadObject()
output.clear()
}
}
/** /**
* This is the base trait for Spark Streaming testsuites. This provides basic functionality * This is the base trait for Spark Streaming testsuites. This provides basic functionality
* to run user-defined set of input on user-defined stream operations, and verify the output. * to run user-defined set of input on user-defined stream operations, and verify the output.
...@@ -108,7 +131,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { ...@@ -108,7 +131,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
*/ */
def setupStreams[U: ClassManifest, V: ClassManifest]( def setupStreams[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]], input: Seq[Seq[U]],
operation: DStream[U] => DStream[V] operation: DStream[U] => DStream[V],
numPartitions: Int = numInputPartitions
): StreamingContext = { ): StreamingContext = {
// Create StreamingContext // Create StreamingContext
...@@ -118,9 +142,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { ...@@ -118,9 +142,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
} }
// Setup the stream computation // Setup the stream computation
val inputStream = new TestInputStream(ssc, input, numInputPartitions) val inputStream = new TestInputStream(ssc, input, numPartitions)
val operatedStream = operation(inputStream) val operatedStream = operation(inputStream)
val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[V]] with SynchronizedBuffer[Seq[V]]) val outputStream = new TestOutputStreamWithPartitions(operatedStream,
new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
ssc.registerInputStream(inputStream) ssc.registerInputStream(inputStream)
ssc.registerOutputStream(outputStream) ssc.registerOutputStream(outputStream)
ssc ssc
...@@ -146,7 +171,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { ...@@ -146,7 +171,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions) val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions)
val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions) val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions)
val operatedStream = operation(inputStream1, inputStream2) val operatedStream = operation(inputStream1, inputStream2)
val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[W]] with SynchronizedBuffer[Seq[W]]) val outputStream = new TestOutputStreamWithPartitions(operatedStream,
new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
ssc.registerInputStream(inputStream1) ssc.registerInputStream(inputStream1)
ssc.registerInputStream(inputStream2) ssc.registerInputStream(inputStream2)
ssc.registerOutputStream(outputStream) ssc.registerOutputStream(outputStream)
...@@ -157,18 +183,37 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { ...@@ -157,18 +183,37 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Runs the streams set up in `ssc` on manual clock for `numBatches` batches and * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
* returns the collected output. It will wait until `numExpectedOutput` number of * returns the collected output. It will wait until `numExpectedOutput` number of
* output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
*
* Returns a sequence of items for each RDD.
*/ */
def runStreams[V: ClassManifest]( def runStreams[V: ClassManifest](
ssc: StreamingContext, ssc: StreamingContext,
numBatches: Int, numBatches: Int,
numExpectedOutput: Int numExpectedOutput: Int
): Seq[Seq[V]] = { ): Seq[Seq[V]] = {
// Flatten each RDD into a single Seq
runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq)
}
/**
* Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
* returns the collected output. It will wait until `numExpectedOutput` number of
* output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
*
* Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
* representing one partition.
*/
def runStreamsWithPartitions[V: ClassManifest](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[Seq[V]]] = {
assert(numBatches > 0, "Number of batches to run stream computation is zero") assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
// Get the output buffer // Get the output buffer
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
val output = outputStream.output val output = outputStream.output
try { try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment