diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index 984909cb947a114c875cc4d2a8dacbabc26813be..df901997e17d132d2059af53b205048888cede0b 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -58,7 +58,7 @@ public class JavaStreamingTestExample { private static int timeoutCounter = 0; - public static void main(String[] args) { + public static void main(String[] args) throws Exception { if (args.length != 3) { System.err.println("Usage: JavaStreamingTestExample " + "<dataDir> <batchDuration> <numBatchesTimeout>"); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java index 4544ad2b42ca70bc70c88d0505aa3128df824b29..1cba565b38c2aeb39a3fc48a39089cfec0a8bde9 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -58,7 +58,7 @@ import java.util.regex.Pattern; public class JavaCustomReceiver extends Receiver<String> { private static final Pattern SPACE = Pattern.compile(" "); - public static void main(String[] args) { + public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaCustomReceiver <hostname> <port>"); System.exit(1); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java index 769b21cecfb8015af76a9437a5054d7c0021f030..ed118f86c058b4f9a128db6b4f24c3fb9a8256f0 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java @@ -21,6 +21,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Arrays; import java.util.Iterator; +import java.util.Map; +import java.util.Set; import java.util.regex.Pattern; import scala.Tuple2; @@ -47,7 +49,7 @@ import org.apache.spark.streaming.Durations; public final class JavaDirectKafkaWordCount { private static final Pattern SPACE = Pattern.compile(" "); - public static void main(String[] args) { + public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" + " <brokers> is a list of one or more Kafka brokers\n" + @@ -64,8 +66,8 @@ public final class JavaDirectKafkaWordCount { SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); - HashSet<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); - HashMap<String, String> kafkaParams = new HashMap<>(); + Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); + Map<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", brokers); // Create direct kafka stream with brokers and topics diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java index bae4b78ac2f472b8568b6586b339cd3cd7bdfd23..33c0a2df2fe43eb0c5fba09c7aafc7292cddfae2 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java @@ -43,7 +43,7 @@ public final class JavaFlumeEventCount { private JavaFlumeEventCount() { } - public static void main(String[] args) { + public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: JavaFlumeEventCount <host> <port>"); System.exit(1); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java index 655da6840cc57bb2941f6fac4b9e51c7c8b3c48a..8a5fd533720413771c1c84438b4dfd05a3ab5de6 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -57,7 +57,7 @@ public final class JavaKafkaWordCount { private JavaKafkaWordCount() { } - public static void main(String[] args) { + public static void main(String[] args) throws Exception { if (args.length < 4) { System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); System.exit(1); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java index 5761da684b4673334d19e15359a4b6aa2153fa76..7a8fe99f48f27be22326b7368bc266f9d70f6a31 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java @@ -48,7 +48,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; public final class JavaNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); - public static void main(String[] args) { + public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); System.exit(1); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index e5fb2bfbfae7b22e65604c6253a7ad4b58e6e6cf..05631494484beabc0493c14b828f9f8a590db3d8 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -183,7 +183,7 @@ public final class JavaRecoverableNetworkWordCount { return ssc; } - public static void main(String[] args) { + public static void main(String[] args) throws Exception { if (args.length != 4) { System.err.println("You arguments were " + Arrays.asList(args)); System.err.println( diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java index 4b9d9efc8549ae0e4e547d270caf9fb363e1c4a2..7aa8862761d2b6ae1ee799dab678e9c02d5bba3d 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java @@ -53,7 +53,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; public final class JavaSqlNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); - public static void main(String[] args) { + public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); System.exit(1); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index 4230dab52e5d42207a49e07f63dfb3cc67e5e899..ed36df852ace689f0ec9d6530b1c49dda38c74b0 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -50,7 +50,7 @@ import org.apache.spark.streaming.api.java.*; public class JavaStatefulNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); - public static void main(String[] args) { + public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaStatefulNetworkWordCount <hostname> <port>"); System.exit(1); diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index 0e43e9272d7c319daf98dc369f27eab1e6c7c487..d40bd3ff560d6fb603d19f14213b2cbb16dfa718 100644 --- a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.regex.Pattern; import com.amazonaws.regions.RegionUtils; -import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; @@ -81,9 +80,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn */ public final class JavaKinesisWordCountASL { // needs to be public for access from run-example private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); - private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); - public static void main(String[] args) { + public static void main(String[] args) throws Exception { // Check that all required args were passed in. if (args.length != 3) { System.err.println( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 922e4a5e4d9cc7bd3ee3b35d84fb4e009b05d4d8..7e78fa1d7e159ce7921ca9119c60ebe62cf75881 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -558,6 +558,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Wait for the execution to stop. Any exceptions that occurs during the execution * will be thrown in this thread. */ + @throws[InterruptedException] def awaitTermination(): Unit = { ssc.awaitTermination() } @@ -570,6 +571,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * @return `true` if it's stopped; or throw the reported error during the execution; or `false` * if the waiting time elapsed before returning from the method. */ + @throws[InterruptedException] def awaitTerminationOrTimeout(timeout: Long): Boolean = { ssc.awaitTerminationOrTimeout(timeout) }