diff --git a/external/flume/pom.xml b/external/flume/pom.xml index fae84a31b879c257fb9dcbcce9b12be3a272ddd8..a682f0e8471d8f50b1e2a43a4d048aaa672210e8 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -39,19 +39,13 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000000000000000000000000000000000000..6e1f01900071b30fe0ecc7e4546dd4bd29ccaf69 --- /dev/null +++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + } +} diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala new file mode 100644 index 0000000000000000000000000000000000000000..1a900007b696b7cfc76866086019b79c2130e0ab --- /dev/null +++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import java.io.{IOException, ObjectInputStream} + +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.{DStream, ForEachDStream} +import org.apache.spark.util.Utils + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** + * This is a output stream just for the testsuites. All the output is collected into a + * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + * + * The buffer contains a sequence of RDD's, each containing a sequence of items + */ +class TestOutputStream[T: ClassTag](parent: DStream[T], + val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]()) + extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { + val collected = rdd.collect() + output += collected + }) { + + // This is to clear the output buffer every it is read from a checkpoint + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { + ois.defaultReadObject() + output.clear() + } +} diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 475026e8eb1407f26022a6b8949eb2e66f21725c..b57a1c71e35b9240623466d0fe2b676a759c9c6a 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -20,9 +20,6 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress import java.util.concurrent.{Callable, ExecutorCompletionService, Executors} -import java.util.Random - -import org.apache.spark.TestUtils import scala.collection.JavaConversions._ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} @@ -32,20 +29,35 @@ import org.apache.flume.channel.MemoryChannel import org.apache.flume.conf.Configurables import org.apache.flume.event.EventBuilder +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.util.ManualClock -import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext} +import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext} import org.apache.spark.streaming.flume.sink._ import org.apache.spark.util.Utils -class FlumePollingStreamSuite extends TestSuiteBase { +class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging { val batchCount = 5 val eventsPerBatch = 100 val totalEventsPerChannel = batchCount * eventsPerBatch val channelCapacity = 5000 val maxAttempts = 5 + val batchDuration = Seconds(1) + + val conf = new SparkConf() + .setMaster("local[2]") + .setAppName(this.getClass.getSimpleName) + + def beforeFunction() { + logInfo("Using manual clock") + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + } + + before(beforeFunction()) test("flume polling test") { testMultipleTimes(testFlumePolling) @@ -229,4 +241,5 @@ class FlumePollingStreamSuite extends TestSuiteBase { null } } + } diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index ce74b0b77752ccdae4a3affb326bb0c180de2b0f..b3f44471cd326a96d18bd24b83972a9b72cdaaec 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -39,13 +39,7 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 8c5bd79304e29e6aa65d1a4a26395fc70dc6374d..703806735b3ff683c8106ae42634f1052700f96b 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -39,13 +39,7 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> + <scope>provided</scope> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000000000000000000000000000000000000..6e1f01900071b30fe0ecc7e4546dd4bd29ccaf69 --- /dev/null +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + } +} diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 467fd263e2d6418fbc40486588a0e6ea1c057b53..84595acf45ccb3a8a6e19e16126caeb06ad7a8e2 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -17,11 +17,19 @@ package org.apache.spark.streaming.mqtt -import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.scalatest.FunSuite + +import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream -class MQTTStreamSuite extends TestSuiteBase { +class MQTTStreamSuite extends FunSuite { + + val batchDuration = Seconds(1) + + private val master: String = "local[2]" + + private val framework: String = this.getClass.getSimpleName test("mqtt input stream") { val ssc = new StreamingContext(master, framework, batchDuration) diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index 399c30a51a89fca572a86d0522133911c6df4323..000ace1446e5ef8b5570ce72f97268c352371a53 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -39,13 +39,7 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> + <scope>provided</scope> </dependency> <dependency> <groupId>org.twitter4j</groupId> diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000000000000000000000000000000000000..6e1f01900071b30fe0ecc7e4546dd4bd29ccaf69 --- /dev/null +++ b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + } +} diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index 93741e037516446f05f50a2decc03b110b976cd1..9ee57d7581d85a07710241315b148ad4f4bac6af 100644 --- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -17,13 +17,23 @@ package org.apache.spark.streaming.twitter -import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} -import org.apache.spark.storage.StorageLevel + +import org.scalatest.{BeforeAndAfter, FunSuite} +import twitter4j.Status import twitter4j.auth.{NullAuthorization, Authorization} + +import org.apache.spark.Logging +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream -import twitter4j.Status -class TwitterStreamSuite extends TestSuiteBase { +class TwitterStreamSuite extends FunSuite with BeforeAndAfter with Logging { + + val batchDuration = Seconds(1) + + private val master: String = "local[2]" + + private val framework: String = this.getClass.getSimpleName test("twitter input stream") { val ssc = new StreamingContext(master, framework, batchDuration) diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index e4539167ce756373c4628aa3ae96b41bbab2726e..29c452093502ed0f61a8bda6832f1d77a0da1aed 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -39,13 +39,7 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> + <scope>provided</scope> </dependency> <dependency> <groupId>${akka.group}</groupId> diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000000000000000000000000000000000000..6e1f01900071b30fe0ecc7e4546dd4bd29ccaf69 --- /dev/null +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + } +} diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index cc10ff6ae03cde6617be37d59bdce395d7123d67..a7566e733d891306bd743d22a16e42768d0dc720 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -20,12 +20,19 @@ package org.apache.spark.streaming.zeromq import akka.actor.SupervisorStrategy import akka.util.ByteString import akka.zeromq.Subscribe +import org.scalatest.FunSuite import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream -class ZeroMQStreamSuite extends TestSuiteBase { +class ZeroMQStreamSuite extends FunSuite { + + val batchDuration = Seconds(1) + + private val master: String = "local[2]" + + private val framework: String = this.getClass.getSimpleName test("zeromq input stream") { val ssc = new StreamingContext(master, framework, batchDuration)