diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index a191321d9105801d70c76c0a557323a897b1a1ae..60f228b8adae4d09c4851a3026c1cf70a709f53b 100644 --- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -28,16 +28,15 @@ object PageViewStream { // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.networkTextStream(host, port) - .flatMap(_.split("\n")) - .map(PageView.fromString(_)) + .flatMap(_.split("\n")) + .map(PageView.fromString(_)) // Return a count of views per URL seen in each batch - val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey() + val pageCounts = pageViews.map(view => view.url).countByValue() // Return a sliding window of page views per URL in the last ten seconds - val slidingPageCounts = pageViews.map(view => ((view.url, 1))) - .window(Seconds(10), Seconds(2)) - .countByKey() + val slidingPageCounts = pageViews.map(view => view.url) + .countByValueAndWindow(Seconds(10), Seconds(2)) // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 6abec9e6bec0b279a9363d8de57704a1742d10ea..ce42b742d77e447605ac2de3cbd2ab4e7a27e5c8 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -441,6 +441,15 @@ abstract class DStream[T: ClassManifest] ( */ def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _) + /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate + * the RDDs with `numPartitions` partitions (Spark's default number of partitions if + * `numPartitions` not specified). + */ + def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] = + this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + /** * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. @@ -494,14 +503,16 @@ abstract class DStream[T: ClassManifest] ( } /** - * Return a new DStream which is computed based on windowed batches of this DStream. - * The new DStream generates RDDs with the same interval as this DStream. + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. The new DStream generates RDDs with + * the same interval as this DStream. * @param windowDuration width of the window; must be a multiple of this DStream's interval. */ def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) /** - * Return a new DStream which is computed based on windowed batches of this DStream. + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -512,19 +523,15 @@ abstract class DStream[T: ClassManifest] ( new WindowedDStream(this, windowDuration, slideDuration) } - /** - * Return a new DStream which computed based on tumbling window on this DStream. - * This is equivalent to window(batchTime, batchTime). - * @param batchDuration tumbling window duration; must be a multiple of this DStream's - * batching interval - */ - def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration) - /** * Return a new DStream in which each RDD has a single element generated by reducing all - * elements in a window over this DStream. windowDuration and slideDuration are as defined - * in the window() operation. This is equivalent to - * window(windowDuration, slideDuration).reduce(reduceFunc) + * elements in a sliding window over this DStream. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval */ def reduceByWindow( reduceFunc: (T, T) => T, @@ -534,6 +541,22 @@ abstract class DStream[T: ClassManifest] ( this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) } + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. However, the reduction is done incrementally + * using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ def reduceByWindow( reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, @@ -547,13 +570,46 @@ abstract class DStream[T: ClassManifest] ( /** * Return a new DStream in which each RDD has a single element generated by counting the number - * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the - * window() operation. This is equivalent to window(windowDuration, slideDuration).count() + * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval */ def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = { this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) } + /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate + * the RDDs with `numPartitions` partitions (Spark's default number of partitions if + * `numPartitions` not specified). + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValueAndWindow( + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int = ssc.sc.defaultParallelism + ): DStream[(T, Long)] = { + + this.map(x => (x, 1L)).reduceByKeyAndWindow( + (x: Long, y: Long) => x + y, + (x: Long, y: Long) => x - y, + windowDuration, + slideDuration, + numPartitions, + (x: (T, Long)) => x._2 != 0L + ) + } + /** * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same slideDuration as this DStream. diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 835b20ae08e3aee27d6072b055b092088a5f8406..5127db3bbcd65df9e7924e12759b56b547e5dea4 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -94,14 +94,6 @@ extends Serializable { new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner) } - /** - * Return a new DStream by counting the number of values of each key in each RDD. Hash - * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions. - */ - def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = { - self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) - } - /** * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs @@ -211,7 +203,7 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param numPartitions Number of partitions of each RDD in the new DStream. + * @param numPartitions number of partitions of each RDD in the new DStream. */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, @@ -248,10 +240,10 @@ extends Serializable { /** * Return a new DStream by applying incremental `reduceByKey` over a sliding window. - * The reduced value of over a new window is calculated using the old window's reduce value : + * The reduced value of over a new window is calculated using the old window's reduced value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function @@ -281,10 +273,10 @@ extends Serializable { /** * Return a new DStream by applying incremental `reduceByKey` over a sliding window. - * The reduced value of over a new window is calculated using the old window's reduce value : + * The reduced value of over a new window is calculated using the old window's reduced value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * @param reduceFunc associative reduce function * @param invReduceFunc inverse reduce function @@ -315,31 +307,6 @@ extends Serializable { ) } - /** - * Return a new DStream by counting the number of values for each key over a window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions Number of partitions of each RDD in the new DStream. - */ - def countByKeyAndWindow( - windowDuration: Duration, - slideDuration: Duration, - numPartitions: Int = self.ssc.sc.defaultParallelism - ): DStream[(K, Long)] = { - - self.map(x => (x._1, 1L)).reduceByKeyAndWindow( - (x: Long, y: Long) => x + y, - (x: Long, y: Long) => x - y, - windowDuration, - slideDuration, - numPartitions - ) - } - /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index 2e7466b16c93edb162ac1027a0a0e75dfff7b850..30985b4ebc13b2db14a86a6f6e3b14049efccbfd 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -36,7 +36,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM def cache(): JavaDStream[T] = dstream.cache() /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ - def persist(): JavaDStream[T] = dstream.cache() + def persist(): JavaDStream[T] = dstream.persist() /** Persist the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel) @@ -50,33 +50,26 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM } /** - * Return a new DStream which is computed based on windowed batches of this DStream. - * The new DStream generates RDDs with the same interval as this DStream. + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. The new DStream generates RDDs with + * the same interval as this DStream. * @param windowDuration width of the window; must be a multiple of this DStream's interval. - * @return */ def window(windowDuration: Duration): JavaDStream[T] = dstream.window(windowDuration) /** - * Return a new DStream which is computed based on windowed batches of this DStream. - * @param windowDuration duration (i.e., width) of the window; - * must be a multiple of this DStream's interval + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's interval + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval */ def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] = dstream.window(windowDuration, slideDuration) - /** - * Return a new DStream which computed based on tumbling window on this DStream. - * This is equivalent to window(batchDuration, batchDuration). - * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval - */ - def tumble(batchDuration: Duration): JavaDStream[T] = - dstream.tumble(batchDuration) - /** * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index b93cb7865aef03af96e1bb9547c99786e4980198..1c1ba05ff98b5821e685e2ab5aa9504ed65e6378 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -33,6 +33,26 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable */ def count(): JavaDStream[JLong] = dstream.count() + /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + */ + def countByValue(): JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByValue()) + } + + /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions)) + } + + /** * Return a new DStream in which each RDD has a single element generated by counting the number * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the @@ -42,6 +62,39 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable dstream.countByWindow(windowDuration, slideDuration) } + /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration) + : JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong( + dstream.countByValueAndWindow(windowDuration, slideDuration)) + } + + /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) + : JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong( + dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions)) + } + /** * Return a new DStream in which each RDD is generated by applying glom() to each RDD of * this DStream. Applying glom() to an RDD coalesces all elements within each partition into @@ -114,8 +167,38 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable /** * Return a new DStream in which each RDD has a single element generated by reducing all - * elements in a window over this DStream. windowDuration and slideDuration are as defined in the - * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc) + * elements in a sliding window over this DStream. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration) + } + + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. However, the reduction is done incrementally + * using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval */ def reduceByWindow( reduceFunc: JFunction2[T, T, T], diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index 048e10b69c685dd41b48581c6962c55b62f263f9..952ca657bf77be3ddb79fa0b3a0d3c5bf39e891a 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -33,7 +33,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def cache(): JavaPairDStream[K, V] = dstream.cache() /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ - def persist(): JavaPairDStream[K, V] = dstream.cache() + def persist(): JavaPairDStream[K, V] = dstream.persist() /** Persist the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) @@ -66,14 +66,6 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = dstream.window(windowDuration, slideDuration) - /** - * Return a new DStream which computed based on tumbling window on this DStream. - * This is equivalent to window(batchDuration, batchDuration). - * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval - */ - def tumble(batchDuration: Duration): JavaPairDStream[K, V] = - dstream.tumble(batchDuration) - /** * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. @@ -148,23 +140,6 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) } - /** - * Return a new DStream by counting the number of values of each key in each RDD. Hash - * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions. - */ - def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = { - JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions)); - } - - - /** - * Return a new DStream by counting the number of values of each key in each RDD. Hash - * partitioning is used to generate the RDDs with the default number of partitions. - */ - def countByKey(): JavaPairDStream[K, JLong] = { - JavaPairDStream.scalaToJavaLong(dstream.countByKey()); - } - /** * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs @@ -402,35 +377,6 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( ) } - /** - * Create a new DStream by counting the number of values for each key over a window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) - : JavaPairDStream[K, JLong] = { - JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration)) - } - - /** - * Create a new DStream by counting the number of values for each key over a window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions Number of partitions of each RDD in the new DStream. - */ - def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) - : JavaPairDStream[K, Long] = { - dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions) - } - private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]): (Seq[V], Option[S]) => Option[S] = { val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => { diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 783a393a8ffef1b0aa248d2b060a7853504ea926..7bea0b1fc4e73af8559a115fbe8a5716a2717664 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -134,29 +134,6 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result); } - @Test - public void testTumble() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9), - Arrays.asList(10,11,12), - Arrays.asList(13,14,15), - Arrays.asList(16,17,18)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(1,2,3,4,5,6), - Arrays.asList(7,8,9,10,11,12), - Arrays.asList(13,14,15,16,17,18)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.tumble(new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3); - - assertOrderInvariantEquals(expected, result); - } - @Test public void testFilter() { List<List<String>> inputData = Arrays.asList( @@ -584,24 +561,26 @@ public class JavaAPISuite implements Serializable { } @Test - public void testCountByKey() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + public void testCountByValue() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("hello", "moon"), + Arrays.asList("hello")); List<List<Tuple2<String, Long>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L)), - Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L))); - - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + Arrays.asList( + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("world", 1L)), + Arrays.asList( + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("moon", 1L)), + Arrays.asList( + new Tuple2<String, Long>("hello", 1L))); - JavaPairDStream<String, Long> counted = pairStream.countByKey(); + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Long> counted = stream.countByValue(); JavaTestUtils.attachTestOutputStream(counted); - List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } @@ -712,26 +691,28 @@ public class JavaAPISuite implements Serializable { } @Test - public void testCountByKeyAndWindow() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + public void testCountByValueAndWindow() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("hello", "moon"), + Arrays.asList("hello")); List<List<Tuple2<String, Long>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L)), + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("world", 1L)), Arrays.asList( - new Tuple2<String, Long>("california", 4L), - new Tuple2<String, Long>("new york", 4L)), + new Tuple2<String, Long>("hello", 2L), + new Tuple2<String, Long>("world", 1L), + new Tuple2<String, Long>("moon", 1L)), Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L))); + new Tuple2<String, Long>("hello", 2L), + new Tuple2<String, Long>("moon", 1L))); - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Long> counted = - pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); + stream.countByValueAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(counted); List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index 12388b888737b4a01566d8e25b177c4ebad6530b..1e86cf49bb75a3dedbbeb24cbddb0eeac41c1043 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -24,7 +24,7 @@ class BasicOperationsSuite extends TestSuiteBase { ) } - test("flatmap") { + test("flatMap") { val input = Seq(1 to 4, 5 to 8, 9 to 12) testOperation( input, @@ -88,6 +88,23 @@ class BasicOperationsSuite extends TestSuiteBase { ) } + test("count") { + testOperation( + Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4), + (s: DStream[Int]) => s.count(), + Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L)) + ) + } + + test("countByValue") { + testOperation( + Seq(1 to 1, Seq(1, 1, 1), 1 to 2, Seq(1, 1, 2, 2)), + (s: DStream[Int]) => s.countByValue(), + Seq(Seq((1, 1L)), Seq((1, 3L)), Seq((1, 1L), (2, 1L)), Seq((2, 2L), (1, 2L))), + true + ) + } + test("mapValues") { testOperation( Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), @@ -206,7 +223,7 @@ class BasicOperationsSuite extends TestSuiteBase { case _ => Option(stateObj) } } - s.map(_ -> 1).updateStateByKey[StateObject](updateFunc).mapValues(_.counter) + s.map(x => (x, 1)).updateStateByKey[StateObject](updateFunc).mapValues(_.counter) } testOperation(inputData, updateStateOperation, outputData, true) diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index e6ac7b35aa9a08cf16d0cf9ef27ce6273fb2810c..f8380af3313015164ff6db5571957f382e51ade4 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -236,14 +236,14 @@ class WindowOperationsSuite extends TestSuiteBase { testOperation(input, operation, expectedOutput, numBatches, true) } - test("countByKeyAndWindow") { - val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20))) + test("countByValueAndWindow") { + val input = Seq(Seq("a"), Seq("b", "b"), Seq("a", "b")) val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3))) val windowDuration = Seconds(2) val slideDuration = Seconds(1) val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt - val operation = (s: DStream[(String, Int)]) => { - s.countByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt)) + val operation = (s: DStream[String]) => { + s.countByValueAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt)) } testOperation(input, operation, expectedOutput, numBatches, true) }