diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java index 530ee2ea7967af851c4573c7f9aa9399c08e629b..2ce714cd0be75bff89e35a5a5af14224a290f5dd 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java @@ -24,7 +24,7 @@ import scala.runtime.AbstractFunction2; import java.io.Serializable; /** - * A two-argument function that takes arguments of type T1 and T2 and returns an R. + * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R. */ public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R> implements Serializable { diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala index 8e8bbeb99829c9aed74c2b99048baae51af791af..d314dbdf1d980c82f4be6263bcce13746eb4a7aa 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala +++ b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala @@ -20,8 +20,8 @@ package org.apache.spark.api.java.function import scala.runtime.AbstractFunction3 /** - * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the - * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply + * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the + * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply * isn't marked to allow that). */ private[spark] abstract class WrappedFunction3[T1, T2, T3, R] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index ee351daa602f9115caae332260684968c13b526e..38e34795b49f598bc136c6492c88996f4c42a652 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -479,7 +479,7 @@ abstract class DStream[T: ClassManifest] ( /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: RDD[T] => Unit) { this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) @@ -487,7 +487,7 @@ abstract class DStream[T: ClassManifest] ( /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: (RDD[T], Time) => Unit) { val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) @@ -497,7 +497,7 @@ abstract class DStream[T: ClassManifest] ( /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) @@ -505,7 +505,7 @@ abstract class DStream[T: ClassManifest] ( /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) @@ -518,8 +518,8 @@ abstract class DStream[T: ClassManifest] ( } /** - * Return a new DStream in which each RDD is generated by applying a function on RDDs - * of DStreams stream1 and stream2. + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. */ def transformWith[U: ClassManifest, V: ClassManifest]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] @@ -529,8 +529,8 @@ abstract class DStream[T: ClassManifest] ( } /** - * Return a new DStream in which each RDD is generated by applying a function on RDDs - * of DStreams stream1 and stream2. + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. */ def transformWith[U: ClassManifest, V: ClassManifest]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index c319433e54b73c2da410c555fc7d80a5f84c6f0f..8c12fd11efcafc3d0daa87e39f9d75d6d6a1dd90 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream} -import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream} +import org.apache.spark.streaming.dstream.{ShuffledDStream} import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream} import org.apache.spark.{Partitioner, HashPartitioner} @@ -359,7 +359,7 @@ extends Serializable { } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then @@ -398,11 +398,18 @@ extends Serializable { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) } - + /** + * Return a new DStream by applying a map function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = { new MapValuedDStream[K, V, U](self, mapValuesFunc) } + /** + * Return a new DStream by applying a flatmap function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def flatMapValues[U: ClassManifest]( flatMapValuesFunc: V => TraversableOnce[U] ): DStream[(K, U)] = { @@ -410,9 +417,8 @@ extends Serializable { } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { @@ -420,31 +426,29 @@ extends Serializable { } /** - * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. Partitioner is used to partition each generated RDD. + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. */ def cogroup[W: ClassManifest]( other: DStream[(K, W)], partitioner: Partitioner ): DStream[(K, (Seq[V], Seq[W]))] = { - - val cgd = new CoGroupedDStream[K]( - Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]), - partitioner - ) - val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)( - classManifest[K], - Manifests.seqSeqManifest + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) ) - pdfs.mapValues { - case Seq(vs, ws) => - (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) - } } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { @@ -452,7 +456,15 @@ extends Serializable { } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { + join[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ def join[W: ClassManifest]( @@ -466,7 +478,7 @@ extends Serializable { } /** - * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ @@ -475,7 +487,19 @@ extends Serializable { } /** - * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def leftOuterJoin[W: ClassManifest]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (V, Option[W]))] = { + leftOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control * the partitioning of each RDD. */ @@ -490,7 +514,7 @@ extends Serializable { } /** - * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default * number of partitions. */ @@ -499,7 +523,19 @@ extends Serializable { } /** - * Return new DStream by applying 'right outer join' between RDDs of `this` DStream and + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def rightOuterJoin[W: ClassManifest]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (Option[V], W))] = { + rightOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control * the partitioning of each RDD. */ @@ -514,8 +550,8 @@ extends Serializable { } /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval + * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, @@ -525,8 +561,8 @@ extends Serializable { } /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval + * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles( prefix: String, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 1110d770c4e25a440786c9f1e8fd0fafc16bcb7d..09189eadd824e5ace83405d450df8ffbd1f694d6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -121,10 +121,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * this DStream. Applying glom() to an RDD coalesces all elements within each partition into * an array. */ - def glom(): JavaDStream[JList[T]] = + def glom(): JavaDStream[JList[T]] = { new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + } + - /** Return the StreamingContext associated with this DStream */ + /** Return the [[org.apache.spark.streaming.StreamingContext]] associated with this DStream */ def context(): StreamingContext = dstream.context() /** Return a new DStream by applying a function to all elements of this DStream. */ @@ -239,7 +241,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: JFunction[R, Void]) { dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd))) @@ -247,7 +249,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Apply a function to each RDD in this DStream. This is an output operator, so - * this DStream will be registered as an output stream and therefore materialized. + * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: JFunction2[R, Time, Void]) { dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) @@ -255,7 +257,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = @@ -267,7 +269,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = { implicit val cm: ClassManifest[U] = @@ -279,7 +281,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { @@ -294,7 +296,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this DStream. + * on each RDD of 'this' DStream. */ def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]): JavaPairDStream[K2, V2] = { @@ -309,7 +311,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this and other DStreams. + * on each RDD of 'this' DStream and 'other' DStream. */ def transformWith[U, W]( other: JavaDStream[U], @@ -326,7 +328,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this and other DStreams. + * on each RDD of 'this' DStream and 'other' DStream. */ def transformWith[U, K2, V2]( other: JavaDStream[U], @@ -345,42 +347,42 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this and other DStreams. + * on each RDD of 'this' DStream and 'other' DStream. */ - def transformWith[K, V, W]( - other: JavaPairDStream[K, V], - transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaRDD[W]] + def transformWith[K2, V2, W]( + other: JavaPairDStream[K2, V2], + transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]] ): JavaDStream[W] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmk2: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv2: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] implicit val cmw: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] - def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[W] = + def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd - dstream.transformWith[(K, V), W](other.dstream, scalaTransform(_, _, _)) + dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _)) } /** * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of this and other DStreams. + * on each RDD of 'this' DStream and 'other' DStream. */ - def transformWith[K, V, K2, V2]( - other: JavaPairDStream[K, V], - transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]] - ): JavaPairDStream[K2, V2] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + def transformWith[K2, V2, K3, V3]( + other: JavaPairDStream[K2, V2], + transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]] + ): JavaPairDStream[K3, V3] = { implicit val cmk2: ClassManifest[K2] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] implicit val cmv2: ClassManifest[V2] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] - def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[(K2, V2)] = + implicit val cmk3: ClassManifest[K3] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K3]] + implicit val cmv3: ClassManifest[V3] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V3]] + def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] = transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd - dstream.transformWith[(K, V), (K2, V2)](other.dstream, scalaTransform(_, _, _)) + dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _)) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 821db46fff0c311a90b7ac4cac0a4e9f98c6f5a6..309c0fa24b83ba0601f8b3f8650ed818e119ae39 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3} import org.apache.spark.Partitioner import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} @@ -148,7 +148,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[PairRDDFunctions]] for more + * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more * information. */ def combineByKey[C](createCombiner: JFunction[V, C], @@ -413,7 +413,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param updateFunc State update function. If `this` function returns None, then @@ -428,7 +428,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param updateFunc State update function. If `this` function returns None, then @@ -436,15 +436,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param numPartitions Number of partitions of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S]( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], numPartitions: Int) : JavaPairDStream[K, S] = { + implicit val cm: ClassManifest[S] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) } /** - * Create a new "state" DStream where the state for each key is updated by applying + * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then @@ -452,19 +454,30 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @tparam S State type */ - def updateStateByKey[S: ClassManifest]( + def updateStateByKey[S]( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], partitioner: Partitioner ): JavaPairDStream[K, S] = { + implicit val cm: ClassManifest[S] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) } + + /** + * Return a new DStream by applying a map function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] dstream.mapValues(f) } + /** + * Return a new DStream by applying a flatmap function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = { import scala.collection.JavaConverters._ def fn = (x: V) => f.apply(x).asScala @@ -474,9 +487,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { @@ -486,20 +498,35 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. Partitioner is used to partition each generated RDD. + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (JList[V], JList[W])] = { + def cogroup[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream, numPartitions) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def cogroup[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (JList[V], JList[W])] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] dstream.cogroup(other.dstream, partitioner) - .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { @@ -509,19 +536,32 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.join(other.dstream, numPartitions) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ - def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (V, W)] = { + def join[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (V, W)] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] dstream.join(other.dstream, partitioner) } /** - * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream.. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. */ def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = { implicit val cm: ClassManifest[W] = @@ -531,11 +571,28 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def leftOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (V, Optional[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ - def leftOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (V, Optional[W])] = { + def leftOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (V, Optional[W])] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = dstream.leftOuterJoin(other.dstream, partitioner) @@ -543,8 +600,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream.. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. */ def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = { implicit val cm: ClassManifest[W] = @@ -554,11 +612,29 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def rightOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (Optional[V], W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. */ - def rightOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (Optional[V], W)] = { + def rightOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (Optional[V], W)] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = dstream.rightOuterJoin(other.dstream, partitioner) @@ -640,9 +716,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } object JavaPairDStream { - implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) - :JavaPairDStream[K, V] = + implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) = { new JavaPairDStream[K, V](dstream) + } def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { implicit val cmk: ClassManifest[K] = diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala deleted file mode 100644 index 4eddc755b97f46e0e055917c3815363ae8f8b789..0000000000000000000000000000000000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Partitioner -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.CoGroupedRDD -import org.apache.spark.streaming.{Time, DStream, Duration} - -private[streaming] -class CoGroupedDStream[K : ClassManifest]( - parents: Seq[DStream[(K, _)]], - partitioner: Partitioner - ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { - - if (parents.length == 0) { - throw new IllegalArgumentException("Empty array of parents") - } - - if (parents.map(_.ssc).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different StreamingContexts") - } - - if (parents.map(_.slideDuration).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different slide times") - } - - override def dependencies = parents.toList - - override def slideDuration: Duration = parents.head.slideDuration - - override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { - val part = partitioner - val rdds = parents.flatMap(_.getOrCompute(validTime)) - if (rdds.size > 0) { - val q = new CoGroupedRDD[K](rdds, part) - Some(q) - } else { - None - } - } - -} diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 9f885f07f2b4c1727260291be67e2108ede4f2fe..16622a3459b620b1748b151ddc1bf10bd5520587 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -320,23 +320,104 @@ public class JavaAPISuite implements Serializable { Arrays.asList(9,10,11)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> transformed = - stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { - @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { - return in.map(new Function<Integer, Integer>() { - @Override - public Integer call(Integer i) throws Exception { - return i + 2; - } - }); - }}); + JavaDStream<Integer> transformed = stream.transform( + new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return in.map(new Function<Integer, Integer>() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + } + } + ); + JavaTestUtils.attachTestOutputStream(transformed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); assertOrderInvariantEquals(expected, result); } + @Test + public void testVariousTransform() { + // tests whether all variations of transform can be called from Java + + List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + + List<List<Tuple2<String, Integer>>> pairInputData = + Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); + + JavaDStream<Integer> transformed1 = stream.transform( + new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> transformed2 = stream.transform( + new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, Integer> transformed3 = stream.transform( + new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, Integer> transformed4 = stream.transform( + new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> pairTransformed1 = pairStream.transform( + new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> pairTransformed2 = pairStream.transform( + new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, String> pairTransformed3 = pairStream.transform( + new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, String> pairTransformed4 = pairStream.transform( + new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + return null; + } + } + ); + + } + @Test public void testTransformWith() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( @@ -374,10 +455,18 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith( pairStream2, - new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() { - @Override - public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception { - return stringStringJavaPairRDD.join(stringStringJavaPairRDD2); + new Function3 < + JavaPairRDD<String, String>, + JavaPairRDD<String, String>, + Time, + JavaPairRDD<String, Tuple2<String, String>> + >() { + @Override public JavaPairRDD<String, Tuple2<String, String>> call( + JavaPairRDD<String, String> rdd1, + JavaPairRDD<String, String> rdd2, + Time time + ) throws Exception { + return rdd1.join(rdd2); } } ); @@ -389,6 +478,106 @@ public class JavaAPISuite implements Serializable { } + @Test + public void testVariousTransformWith() { + // tests whether all variations of transformWith can be called from Java + + List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1)); + List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x")); + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); + JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); + + List<List<Tuple2<String, Integer>>> pairInputData1 = + Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + List<List<Tuple2<Double, Character>>> pairInputData2 = + Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x'))); + JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); + JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); + + JavaDStream<Double> transformed1 = stream1.transformWith( + stream2, + new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> transformed2 = stream1.transformWith( + pairStream1, + new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> transformed3 = stream1.transformWith( + stream2, + new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> transformed4 = stream1.transformWith( + pairStream1, + new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> pairTransformed1 = pairStream1.transformWith( + stream2, + new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith( + pairStream1, + new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith( + stream2, + new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + + JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith( + pairStream2, + new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception { + return null; + } + } + ); + } + @Test public void testFlatMap() { List<List<String>> inputData = Arrays.asList(