diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index dd3eed8affe39b33e6b86bfb479c5ae8621ab122..70c7474a936dc62456cb0b8c159ed373925b4f80 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -27,7 +27,7 @@ object Bagel extends Logging { /** * Runs a Bagel program. - * @param sc [[org.apache.spark.SparkContext]] to use for the program. + * @param sc org.apache.spark.SparkContext to use for the program. * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the * Key will be the vertex id. * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often @@ -38,10 +38,10 @@ object Bagel extends Logging { * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices * after each superstep and provides the result to each vertex in the next * superstep. - * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key + * @param partitioner org.apache.spark.Partitioner partitions values by key * @param numPartitions number of partitions across which to split the graph. * Default is the default parallelism of the SparkContext - * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of + * @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of * intermediate RDDs in each superstep. Defaults to caching in memory. * @param compute function that takes a Vertex, optional set of (possibly combined) messages to * the Vertex, optional Aggregator and the current superstep, @@ -131,7 +131,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default - * [[org.apache.spark.HashPartitioner]] and default storage level + * org.apache.spark.HashPartitioner and default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, @@ -146,7 +146,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the - * default [[org.apache.spark.HashPartitioner]] + * default org.apache.spark.HashPartitioner */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, @@ -166,7 +166,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], - * default [[org.apache.spark.HashPartitioner]], + * default org.apache.spark.HashPartitioner, * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( @@ -180,7 +180,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], - * the default [[org.apache.spark.HashPartitioner]] + * the default org.apache.spark.HashPartitioner * and [[org.apache.spark.bagel.DefaultCombiner]] */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( diff --git a/core/pom.xml b/core/pom.xml index ebc178a10541a09ae4de01865cbf6447bb91ed93..a333bff28c246ebb49a3479c9493a4139b359917 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -225,7 +225,7 @@ </goals> <configuration> <exportAntProperties>true</exportAntProperties> - <tasks> + <target> <property name="spark.classpath" refid="maven.test.classpath" /> <property environment="env" /> <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> @@ -238,7 +238,7 @@ </not> </condition> </fail> - </tasks> + </target> </configuration> </execution> </executions> diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1f5334f3dbb40bc9923ee38e815f8f126281c988..da778aa851cd2889cc7a4a94cebde33aab8451e1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -351,7 +351,7 @@ class SparkContext( * using the older MapReduce API (`org.apache.hadoop.mapred`). * * @param conf JobConf for setting up the dataset - * @param inputFormatClass Class of the [[InputFormat]] + * @param inputFormatClass Class of the InputFormat * @param keyClass Class of the keys * @param valueClass Class of the values * @param minSplits Minimum number of Hadoop Splits to generate. diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 9d75d7c4ad69ac6953b6eb7db5323df45e26899b..006e2a3335428bebc9b353b0fd6d0d63fe3248ad 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -81,7 +81,7 @@ class JobLogger(val user: String, val logDirName: String) /** * Create a log file for one job * @param jobID ID of the job - * @exception FileNotFoundException Fail to create log file + * @throws FileNotFoundException Fail to create log file */ protected def createLogWriter(jobID: Int) { try { diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala index bf71882ef770a89c1725f791da119154e7ff5344..c539d2f708f95347475f429ca429660c6f40cfc6 100644 --- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala @@ -23,9 +23,9 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.typesafe.config.Config /** - * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception. + * An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception * This is necessary as Spark Executors are allowed to recover from fatal exceptions - * (see [[org.apache.spark.executor.Executor]]). + * (see org.apache.spark.executor.Executor) */ object IndestructibleActorSystem { def apply(name: String, config: Config): ActorSystem = diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index 5b0d2c36510b8a6ac27a66f006ee32d9ebd369a4..f837dc7ccc8601e19d510a8c36a6e87bf26f6180 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -19,9 +19,9 @@ package org.apache.spark.util /** * A class for tracking the statistics of a set of numbers (count, mean and variance) in a - * numerically robust way. Includes support for merging two StatCounters. Based on - * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance - * Welford and Chan's algorithms for running variance]]. + * numerically robust way. Includes support for merging two StatCounters. Based on Welford + * and Chan's [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance algorithms]] + * for running variance. * * @constructor Initialize the StatCounter with the given values. */ diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala index d437c055f33d436354f0439cfa5178eb6fab1d02..dc4b8f253f25972b7b182066ad67d01c2d67e6a3 100644 --- a/core/src/main/scala/org/apache/spark/util/Vector.scala +++ b/core/src/main/scala/org/apache/spark/util/Vector.scala @@ -136,7 +136,7 @@ object Vector { /** * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers - * between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided. + * between 0.0 and 1.0. Optional scala.util.Random number generator can be provided. */ def random(length: Int, random: Random = new XORShiftRandom()) = Vector(length, _ => random.nextDouble()) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 20232e9fbb8d0c58e8de8e510eb2670c87ac04c9..aa5079c159830e7b415c8b5979d34ce702ba4785 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -75,8 +75,9 @@ public class JavaAPISuite implements Serializable { else if (a < b) return 1; else return 0; } - }; + } + @SuppressWarnings("unchecked") @Test public void sparkContextUnion() { // Union of non-specialized JavaRDDs @@ -148,6 +149,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(2, foreachCalls); } + @SuppressWarnings("unchecked") @Test public void lookup() { JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( @@ -179,6 +181,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds } + @SuppressWarnings("unchecked") @Test public void cogroup() { JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( @@ -197,6 +200,7 @@ public class JavaAPISuite implements Serializable { cogrouped.collect(); } + @SuppressWarnings("unchecked") @Test public void leftOuterJoin() { JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( @@ -243,6 +247,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(33, sum); } + @SuppressWarnings("unchecked") @Test public void foldByKey() { List<Tuple2<Integer, Integer>> pairs = Arrays.asList( @@ -265,6 +270,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); } + @SuppressWarnings("unchecked") @Test public void reduceByKey() { List<Tuple2<Integer, Integer>> pairs = Arrays.asList( @@ -320,8 +326,8 @@ public class JavaAPISuite implements Serializable { public void take() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Assert.assertEquals(1, rdd.first().intValue()); - List<Integer> firstTwo = rdd.take(2); - List<Integer> sample = rdd.takeSample(false, 2, 42); + rdd.take(2); + rdd.takeSample(false, 2, 42); } @Test @@ -359,8 +365,8 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(2.49444, rdd.stdev(), 0.01); Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01); - Double first = rdd.first(); - List<Double> take = rdd.take(5); + rdd.first(); + rdd.take(5); } @Test @@ -438,11 +444,11 @@ public class JavaAPISuite implements Serializable { return lengths; } }); - Double x = doubles.first(); - Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01); + Assert.assertEquals(5.0, doubles.first(), 0.01); Assert.assertEquals(11, pairs.count()); } + @SuppressWarnings("unchecked") @Test public void mapsFromPairsToPairs() { List<Tuple2<Integer, String>> pairs = Arrays.asList( @@ -509,6 +515,7 @@ public class JavaAPISuite implements Serializable { } } + @SuppressWarnings("unchecked") @Test public void persist() { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); @@ -573,6 +580,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void sequenceFile() { File tempDir = Files.createTempDir(); @@ -602,6 +610,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(pairs, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void writeWithNewAPIHadoopFile() { File tempDir = Files.createTempDir(); @@ -632,6 +641,7 @@ public class JavaAPISuite implements Serializable { }).collect().toString()); } + @SuppressWarnings("unchecked") @Test public void readWithNewAPIHadoopFile() throws IOException { File tempDir = Files.createTempDir(); @@ -674,6 +684,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void objectFilesOfComplexTypes() { File tempDir = Files.createTempDir(); @@ -690,6 +701,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(pairs, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void hadoopFile() { File tempDir = Files.createTempDir(); @@ -719,6 +731,7 @@ public class JavaAPISuite implements Serializable { }).collect().toString()); } + @SuppressWarnings("unchecked") @Test public void hadoopFileCompressed() { File tempDir = Files.createTempDir(); @@ -824,7 +837,7 @@ public class JavaAPISuite implements Serializable { } }; - final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam); + final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); rdd.foreach(new VoidFunction<Integer>() { public void call(Integer x) { floatAccum.add((float) x); @@ -876,6 +889,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect()); } + @SuppressWarnings("unchecked") @Test public void mapOnPairRDD() { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4)); @@ -900,6 +914,7 @@ public class JavaAPISuite implements Serializable { } + @SuppressWarnings("unchecked") @Test public void collectPartitions() { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); @@ -968,7 +983,7 @@ public class JavaAPISuite implements Serializable { @Test public void collectAsMapWithIntArrayValues() { // Regression test for SPARK-1040 - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 })); + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1)); JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() { @Override public Tuple2<Integer, int[]> call(Integer x) throws Exception { @@ -976,6 +991,6 @@ public class JavaAPISuite implements Serializable { } }); pairRDD.collect(); // Works fine - Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException + pairRDD.collectAsMap(); // Used to crash with ClassCastException } } diff --git a/pom.xml b/pom.xml index 6adc670462b257e9a4661e40fdf3c39f85198da9..21060ee69c0412daf9de0e8b3cb0008e5b1fb4c4 100644 --- a/pom.xml +++ b/pom.xml @@ -592,12 +592,13 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> - <version>2.5.1</version> + <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> <encoding>UTF-8</encoding> <maxmem>1024m</maxmem> + <fork>true</fork> </configuration> </plugin> <plugin> @@ -612,7 +613,7 @@ <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> - <version>1.0-M2</version> + <version>1.0-RC2</version> <configuration> <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> <junitxml>.</junitxml> diff --git a/repl/pom.xml b/repl/pom.xml index 73597f635b9e028b0b29ed873d94536eb1d84c16..4c5f9720c802aa8e470950862032560eb3110c25 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -98,7 +98,7 @@ </goals> <configuration> <exportAntProperties>true</exportAntProperties> - <tasks> + <target> <property name="spark.classpath" refid="maven.test.classpath" /> <property environment="env" /> <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> @@ -111,7 +111,7 @@ </not> </condition> </fail> - </tasks> + </target> </configuration> </execution> </executions> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 4dcd0e4c51ec386b8802c27887d6d4172b8fadf9..2c7ff87744d7a7e59321c42f32008dbae3eabf01 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -127,7 +127,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream. * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. [[org.apache.spark.Partitioner]] + * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner * is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = @@ -151,7 +151,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control * thepartitioning of each RDD. */ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { @@ -161,7 +161,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + * org.apache.spark.rdd.PairRDDFunctions for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -176,7 +176,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + * org.apache.spark.rdd.PairRDDFunctions for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -479,7 +479,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new @@ -579,7 +579,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def join[W]( other: JavaPairDStream[K, W], @@ -619,7 +619,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def leftOuterJoin[W]( other: JavaPairDStream[K, W], @@ -660,7 +660,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def rightOuterJoin[W]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 2268160dccc1fefebc415416c6d1ff2977a74459..b082bb058529bc5af72539cba6f3f3eb6cdf353e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -406,7 +406,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). * In the transform function, convert the JavaRDD corresponding to that JavaDStream to - * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD(). */ def transform[T]( dstreams: JList[JavaDStream[_]], @@ -429,7 +429,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). * In the transform function, convert the JavaRDD corresponding to that JavaDStream to - * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD(). */ def transform[K, V]( dstreams: JList[JavaDStream[_]], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index f3c58aede092aceb59b2b9a81ffc08eace6e25df..24734969493605b8c69b121183321282ca8eae2b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -65,7 +65,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying `groupByKey` on each RDD. The supplied - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) @@ -95,7 +95,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { @@ -376,7 +376,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new @@ -396,7 +396,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. Note, that * this function may generate a different a tuple with a different key @@ -453,7 +453,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. + * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs. */ def cogroup[W: ClassTag]( other: DStream[(K, W)], @@ -483,7 +483,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def join[W: ClassTag]( other: DStream[(K, W)], @@ -518,7 +518,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def leftOuterJoin[W: ClassTag]( @@ -554,7 +554,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def rightOuterJoin[W: ClassTag]( diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 4fbbce9b8b90ebc8d8a0cd0da6a3c2dc8dd07929..54a0791d04ea4896749a17fdcdf6c372998e81a6 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -19,7 +19,6 @@ package org.apache.spark.streaming; import scala.Tuple2; -import org.junit.After; import org.junit.Assert; import org.junit.Test; import java.io.*; @@ -30,7 +29,6 @@ import com.google.common.collect.Lists; import com.google.common.io.Files; import com.google.common.collect.Sets; -import org.apache.spark.SparkConf; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -38,6 +36,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -45,6 +44,8 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { + + @SuppressWarnings("unchecked") @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( @@ -64,6 +65,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testMap() { List<List<String>> inputData = Arrays.asList( @@ -87,6 +89,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testWindow() { List<List<Integer>> inputData = Arrays.asList( @@ -108,6 +111,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testWindowWithSlideDuration() { List<List<Integer>> inputData = Arrays.asList( @@ -132,6 +136,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFilter() { List<List<String>> inputData = Arrays.asList( @@ -155,13 +160,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testRepartitionMorePartitions() { List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); - JavaDStream repartitioned = stream.repartition(4); + JavaDStream<Integer> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 2); + JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned = + stream.repartition(4); JavaTestUtils.attachTestOutputStream(repartitioned); List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); @@ -172,13 +180,16 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } } + @SuppressWarnings("unchecked") @Test public void testRepartitionFewerPartitions() { List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); - JavaDStream repartitioned = stream.repartition(2); + JavaDStream<Integer> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 4); + JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned = + stream.repartition(2); JavaTestUtils.attachTestOutputStream(repartitioned); List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); @@ -188,6 +199,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } } + @SuppressWarnings("unchecked") @Test public void testGlom() { List<List<String>> inputData = Arrays.asList( @@ -206,6 +218,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testMapPartitions() { List<List<String>> inputData = Arrays.asList( @@ -217,16 +230,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("YANKEESRED SOCKS")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { - @Override - public Iterable<String> call(Iterator<String> in) { - String out = ""; - while (in.hasNext()) { - out = out + in.next().toUpperCase(); - } - return Lists.newArrayList(out); - } - }); + JavaDStream<String> mapped = stream.mapPartitions( + new FlatMapFunction<Iterator<String>, String>() { + @Override + public Iterable<String> call(Iterator<String> in) { + String out = ""; + while (in.hasNext()) { + out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out); + } + }); JavaTestUtils.attachTestOutputStream(mapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -247,6 +261,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } } + @SuppressWarnings("unchecked") @Test public void testReduce() { List<List<Integer>> inputData = Arrays.asList( @@ -267,6 +282,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testReduceByWindow() { List<List<Integer>> inputData = Arrays.asList( @@ -289,6 +305,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testQueueStream() { List<List<Integer>> expected = Arrays.asList( @@ -312,6 +329,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testTransform() { List<List<Integer>> inputData = Arrays.asList( @@ -344,6 +362,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testVariousTransform() { // tests whether all variations of transform can be called from Java @@ -423,6 +442,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } + @SuppressWarnings("unchecked") @Test public void testTransformWith() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( @@ -492,6 +512,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } + @SuppressWarnings("unchecked") @Test public void testVariousTransformWith() { // tests whether all variations of transformWith can be called from Java @@ -591,6 +612,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ); } + @SuppressWarnings("unchecked") @Test public void testStreamingContextTransform(){ List<List<Integer>> stream1input = Arrays.asList( @@ -658,6 +680,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFlatMap() { List<List<String>> inputData = Arrays.asList( @@ -683,6 +706,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairFlatMap() { List<List<String>> inputData = Arrays.asList( @@ -718,22 +742,24 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<Integer, String>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { - @Override - public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<Integer, String>(in.length(), letter)); - } - return out; - } - }); + JavaPairDStream<Integer,String> flatMapped = stream.flatMap( + new PairFlatMapFunction<String, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { + List<Tuple2<Integer, String>> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2<Integer, String>(in.length(), letter)); + } + return out; + } + }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testUnion() { List<List<Integer>> inputData1 = Arrays.asList( @@ -778,6 +804,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // PairDStream Functions + @SuppressWarnings("unchecked") @Test public void testPairFilter() { List<List<String>> inputData = Arrays.asList( @@ -810,7 +837,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } - List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( + @SuppressWarnings("unchecked") + private List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( Arrays.asList(new Tuple2<String, String>("california", "dodgers"), new Tuple2<String, String>("california", "giants"), new Tuple2<String, String>("new york", "yankees"), @@ -820,7 +848,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<String, String>("new york", "rangers"), new Tuple2<String, String>("new york", "islanders"))); - List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( + @SuppressWarnings("unchecked") + private List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( Arrays.asList( new Tuple2<String, Integer>("california", 1), new Tuple2<String, Integer>("california", 3), @@ -832,6 +861,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<String, Integer>("new york", 3), new Tuple2<String, Integer>("new york", 1))); + @SuppressWarnings("unchecked") @Test public void testPairMap() { // Maps pair -> pair of different type List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -864,6 +894,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairMapPartitions() { // Maps pair -> pair of different type List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -901,6 +932,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairMap2() { // Maps pair -> single List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -925,6 +957,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( @@ -967,6 +1000,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairGroupByKey() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; @@ -989,6 +1023,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairReduceByKey() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1013,6 +1048,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCombineByKey() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1043,6 +1079,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCountByValue() { List<List<String>> inputData = Arrays.asList( @@ -1068,6 +1105,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testGroupByKeyAndWindow() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1113,6 +1151,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2())); } + @SuppressWarnings("unchecked") @Test public void testReduceByKeyAndWindow() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1136,6 +1175,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testUpdateStateByKey() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1171,6 +1211,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testReduceByKeyAndWindowWithInverse() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; @@ -1194,6 +1235,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCountByValueAndWindow() { List<List<String>> inputData = Arrays.asList( @@ -1227,6 +1269,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, unorderedResult); } + @SuppressWarnings("unchecked") @Test public void testPairTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( @@ -1271,6 +1314,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairToNormalRDDTransform() { List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( @@ -1312,6 +1356,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") + @Test public void testMapValues() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; @@ -1342,6 +1388,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFlatMapValues() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; @@ -1386,6 +1433,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCoGroup() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( @@ -1429,6 +1477,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testJoin() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( @@ -1472,6 +1521,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testLeftOuterJoin() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( @@ -1503,6 +1553,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCheckpointMasterRecovery() throws InterruptedException { List<List<String>> inputData = Arrays.asList( @@ -1541,7 +1592,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } - /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + @SuppressWarnings("unchecked") @Test public void testCheckpointofIndividualStream() throws InterruptedException { List<List<String>> inputData = Arrays.asList( @@ -1581,16 +1633,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testSocketString() { class Converter extends Function<InputStream, Iterable<String>> { - public Iterable<String> call(InputStream in) { + public Iterable<String> call(InputStream in) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); List<String> out = new ArrayList<String>(); - try { - while (true) { - String line = reader.readLine(); - if (line == null) { break; } - out.add(line); - } - } catch (IOException e) { } + while (true) { + String line = reader.readLine(); + if (line == null) { break; } + out.add(line); + } return out; } } diff --git a/yarn/pom.xml b/yarn/pom.xml index e7eba36ba351b68b2689ee1cc5d958e387c649d4..c0e133dd603b18ccc6a774dbbd3d205789a55d7b 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -133,7 +133,7 @@ </goals> <configuration> <exportAntProperties>true</exportAntProperties> - <tasks> + <target> <property name="spark.classpath" refid="maven.test.classpath" /> <property environment="env" /> <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> @@ -146,7 +146,7 @@ </not> </condition> </fail> - </tasks> + </target> </configuration> </execution> </executions>