diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 93a8c15e3ec30f2a1cd3519a146cdb950eac213f..efe58ea2e0e780330107fa08b50f2bcd63200e7e 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -222,6 +222,18 @@ streaming_flume_sink = Module( ) +streaming_akka = Module( + name="streaming-akka", + dependencies=[streaming], + source_file_regexes=[ + "external/akka", + ], + sbt_test_goals=[ + "streaming-akka/test", + ] +) + + streaming_flume = Module( name="streaming-flume", dependencies=[streaming], diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 97db865daa371be6c685df74aa48e578bf83119f..95b99862ec0622801d06867b65ab3e11c385eb47 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -257,25 +257,54 @@ The following table summarizes the characteristics of both types of receivers ## Implementing and Using a Custom Actor-based Receiver +<div class="codetabs"> +<div data-lang="scala" markdown="1" > + Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to -receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper) -trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using - `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc. +receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver) +allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. {% highlight scala %} -class CustomActor extends Actor with ActorHelper { + +class CustomActor extends ActorReceiver { def receive = { case data: String => store(data) } } + +// A new input stream can be created with this custom actor as +val ssc: StreamingContext = ... +val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver") + {% endhighlight %} -And a new input stream can be created with this custom actor as +See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example. +</div> +<div data-lang="java" markdown="1"> + +Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to +receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver) +allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. + +{% highlight java %} + +class CustomActor extends JavaActorReceiver { + @Override + public void onReceive(Object msg) throws Exception { + store((String) msg); + } +} + +// A new input stream can be created with this custom actor as +JavaStreamingContext jssc = ...; +JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver"); -{% highlight scala %} -val ssc: StreamingContext = ... -val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver") {% endhighlight %} -See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) -for an end-to-end example. +See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example. +</div> +</div> + +<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API. diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 8fd075d02b78e96cb66335c081cd442ea752bc97..93c34efb6662ded6b1f065779c7d2ea2909655fc 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -659,11 +659,11 @@ methods for creating DStreams from files and Akka actors as input sources. <span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only `textFileStream` is available. - **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka - actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver + actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`. See the [Custom Receiver Guide](streaming-custom-receivers.html) for more details. <span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala - libraries, `actorStream` is not available in the Python API. + libraries, `AkkaUtils.createStream` is not available in the Python API. - **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream. diff --git a/examples/pom.xml b/examples/pom.xml index 1a0d5e5854642427c7750ccde8881d067a56c035..9437cee2abfdfe16f3a349c81558385143ade230 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -75,6 +75,11 @@ <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId> diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java index 2377207779fece09d40db5f2c4a04d4896f53779..62e563380a9e7eddd9b0c5679bc727635b6e9fef 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java @@ -31,7 +31,8 @@ import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.receiver.JavaActorReceiver; +import org.apache.spark.streaming.akka.AkkaUtils; +import org.apache.spark.streaming.akka.JavaActorReceiver; /** * A sample actor as receiver, is also simplest. This receiver actor @@ -56,6 +57,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver { remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf()); } + @Override public void onReceive(Object msg) throws Exception { store((T) msg); } @@ -100,18 +102,20 @@ public class JavaActorWordCount { String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor"; /* - * Following is the use of actorStream to plug in custom actor as receiver + * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver * * An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e type of data received and InputDstream * should be same. * - * For example: Both actorStream and JavaSampleActorReceiver are parameterized + * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized * to same type to ensure type safety. */ - JavaDStream<String> lines = jssc.actorStream( - Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver"); + JavaDStream<String> lines = AkkaUtils.createStream( + jssc, + Props.create(JavaSampleActorReceiver.class, feederActorURI), + "SampleReceiver"); // compute wordcount lines.flatMap(new FlatMapFunction<String, String>() { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 88cdc6bc144e5fb01a3d9bcba5c9b9ad555962fe..8e88987439ffc731e90547df942450705fc57966 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList import scala.reflect.ClassTag import scala.util.Random -import akka.actor.{actorRef2Scala, Actor, ActorRef, Props} +import akka.actor._ +import com.typesafe.config.ConfigFactory -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.receiver.ActorReceiver -import org.apache.spark.util.AkkaUtils +import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} case class SubscribeReceiver(receiverActor: ActorRef) case class UnsubscribeReceiver(receiverActor: ActorRef) @@ -78,8 +78,7 @@ class FeederActor extends Actor { * * @see [[org.apache.spark.examples.streaming.FeederActor]] */ -class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) -extends ActorReceiver { +class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver { lazy private val remotePublisher = context.actorSelection(urlOfPublisher) @@ -108,9 +107,13 @@ object FeederActor { } val Seq(host, port) = args.toSeq - val conf = new SparkConf - val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf, - securityManager = new SecurityManager(conf))._1 + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + |akka.remote.netty.tcp.hostname = "$host" + |akka.remote.netty.tcp.port = $port + |""".stripMargin) + val actorSystem = ActorSystem("test", akkaConf) val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") println("Feeder started as:" + feeder) @@ -121,6 +124,7 @@ object FeederActor { /** * A sample word count program demonstrating the use of plugging in + * * Actor as Receiver * Usage: ActorWordCount <hostname> <port> * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. @@ -146,20 +150,21 @@ object ActorWordCount { val ssc = new StreamingContext(sparkConf, Seconds(2)) /* - * Following is the use of actorStream to plug in custom actor as receiver + * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver * * An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and InputDstream + * to ensure the type safety, i.e type of data received and InputDStream * should be same. * - * For example: Both actorStream and SampleActorReceiver are parameterized + * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized * to same type to ensure type safety. */ - - val lines = ssc.actorStream[String]( - Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( - host, port.toInt))), "SampleReceiver") + val lines = AkkaUtils.createStream[String]( + ssc, + Props(classOf[SampleActorReceiver[String]], + "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)), + "SampleReceiver") // compute wordcount lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 96448905760fbfc696c06a8312c9ffebf8870a94..f612e508eb78e5800f97eaec96e7ea2a9142fcaa 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala import akka.util.ByteString import akka.zeromq._ import akka.zeromq.Subscribe +import com.typesafe.config.ConfigFactory -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.zeromq._ @@ -69,10 +70,10 @@ object SimpleZeroMQPublisher { * * To run this example locally, you may run publisher as * `$ bin/run-example \ - * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo` * and run the example as * `$ bin/run-example \ - * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` + * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo` */ // scalastyle:on object ZeroMQWordCount { @@ -90,7 +91,11 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator // For this stream, a zeroMQ publisher should be running. - val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) + val lines = ZeroMQUtils.createStream( + ssc, + url, + Subscribe(topic), + bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/external/akka/pom.xml b/external/akka/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..34de9bae00e49c2bb1c7d65a1bc274c5a8262d07 --- /dev/null +++ b/external/akka/pom.xml @@ -0,0 +1,73 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.10</artifactId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-akka_2.10</artifactId> + <properties> + <sbt.project.name>streaming-akka</sbt.project.name> + </properties> + <packaging>jar</packaging> + <name>Spark Project External Akka</name> + <url>http://spark.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-actor_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-remote_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + </build> +</project> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala similarity index 74% rename from streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala rename to external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index 0eabf3d260b262057fc950a37343a42ce6999493..c75dc92445b64a8677dfe900b7bf409d07e0c946 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming.receiver +package org.apache.spark.streaming.akka import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger @@ -26,23 +26,44 @@ import scala.reflect.ClassTag import akka.actor._ import akka.actor.SupervisorStrategy.{Escalate, Restart} +import com.typesafe.config.ConfigFactory -import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.{Logging, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver /** * :: DeveloperApi :: * A helper with set of defaults for supervisor strategy */ @DeveloperApi -object ActorSupervisorStrategy { +object ActorReceiver { - val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + /** + * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and + * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for + * others, it just escalates the failure to the supervisor of the supervisor. + */ + val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { case _: RuntimeException => Restart case _: Exception => Escalate } + + /** + * A default ActorSystem creator. It will use a unique system name + * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote + * communication. + */ + val defaultActorSystemCreator: () => ActorSystem = () => { + val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}" + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + |""".stripMargin) + ActorSystem(uniqueSystemName, akkaConf) + } } /** @@ -58,13 +79,12 @@ object ActorSupervisorStrategy { * } * } * - * // Can be used with an actorStream as follows - * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver") * * }}} * * @note Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of push block and InputDStream + * to ensure the type safety, i.e. parametrized type of push block and InputDStream * should be same. */ @DeveloperApi @@ -103,18 +123,18 @@ abstract class ActorReceiver extends Actor { * * @example {{{ * class MyActor extends JavaActorReceiver { - * def receive { - * case anything: String => store(anything) + * @Override + * public void onReceive(Object msg) throws Exception { + * store((String) msg); * } * } * - * // Can be used with an actorStream as follows - * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver"); * * }}} * * @note Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of push block and InputDStream + * to ensure the type safety, i.e. parametrized type of push block and InputDStream * should be same. */ @DeveloperApi @@ -147,8 +167,8 @@ abstract class JavaActorReceiver extends UntypedActor { /** * :: DeveloperApi :: * Statistics for querying the supervisor about state of workers. Used in - * conjunction with `StreamingContext.actorStream` and - * [[org.apache.spark.streaming.receiver.ActorReceiver]]. + * conjunction with `AkkaUtils.createStream` and + * [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]]. */ @DeveloperApi case class Statistics(numberOfMsgs: Int, @@ -157,10 +177,10 @@ case class Statistics(numberOfMsgs: Int, otherInfo: String) /** Case class to receive data sent by child actors */ -private[streaming] sealed trait ActorReceiverData -private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData -private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData -private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData +private[akka] sealed trait ActorReceiverData +private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData +private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData +private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData /** * Provides Actors as receivers for receiving stream. @@ -181,14 +201,16 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec * context.parent ! Props(new Worker, "Worker") * }}} */ -private[streaming] class ActorReceiverSupervisor[T: ClassTag]( +private[akka] class ActorReceiverSupervisor[T: ClassTag]( + actorSystemCreator: () => ActorSystem, props: Props, name: String, storageLevel: StorageLevel, receiverSupervisorStrategy: SupervisorStrategy ) extends Receiver[T](storageLevel) with Logging { - protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor), + private lazy val actorSystem = actorSystemCreator() + protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor), "Supervisor" + streamId) class Supervisor extends Actor { @@ -241,5 +263,7 @@ private[streaming] class ActorReceiverSupervisor[T: ClassTag]( def onStop(): Unit = { actorSupervisor ! PoisonPill + actorSystem.shutdown() + actorSystem.awaitTermination() } } diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala new file mode 100644 index 0000000000000000000000000000000000000000..38c35c5ae7a18a51161ab0835a3b8de3ccc60b73 --- /dev/null +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import scala.reflect.ClassTag + +import akka.actor.{ActorSystem, Props, SupervisorStrategy} + +import org.apache.spark.api.java.function.{Function0 => JFunction0} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +object AkkaUtils { + + /** + * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details. + * + * @param ssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2) + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping (default: + * ActorReceiver.defaultActorSystemCreator) + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T: ClassTag]( + ssc: StreamingContext, + propsForActor: Props, + actorName: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, + supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy + ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") { + val cleanF = ssc.sc.clean(actorSystemCreator) + ssc.receiverStream(new ActorReceiverSupervisor[T]( + cleanF, + propsForActor, + actorName, + storageLevel, + supervisorStrategy)) + } + + /** + * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * @param supervisorStrategy the supervisor strategy + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String, + storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem], + supervisorStrategy: SupervisorStrategy + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T]( + jssc.ssc, + propsForActor, + actorName, + storageLevel, + () => actorSystemCreator.call(), + supervisorStrategy) + } + + /** + * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T](jssc.ssc, propsForActor, actorName, storageLevel) + } + + /** + * Create an input stream with a user-defined actor. Storage level of the data will be the default + * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T](jssc.ssc, propsForActor, actorName) + } +} diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java new file mode 100644 index 0000000000000000000000000000000000000000..b732506767154cf75195def612743d52c5642702 --- /dev/null +++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.SupervisorStrategy; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.Test; + +import org.apache.spark.api.java.function.Function0; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; + +public class JavaAkkaUtilsSuite { + + @Test // tests the API, does not actually test data receiving + public void testAkkaUtils() { + JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + try { + JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream( + jsc, Props.create(JavaTestActor.class), "test"); + JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream( + jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream( + jsc, + Props.create(JavaTestActor.class), + "test", StorageLevel.MEMORY_AND_DISK_SER_2(), + new ActorSystemCreatorForTest(), + SupervisorStrategy.defaultStrategy()); + } finally { + jsc.stop(); + } + } +} + +class ActorSystemCreatorForTest implements Function0<ActorSystem> { + @Override + public ActorSystem call() { + return null; + } +} + + +class JavaTestActor extends JavaActorReceiver { + @Override + public void onReceive(Object message) throws Exception { + store((String) message); + } +} diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..f437585a98e4f53f6fd0c2b61acb211f1d6d831d --- /dev/null +++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import akka.actor.{Props, SupervisorStrategy} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +class AkkaUtilsSuite extends SparkFunSuite { + + test("createStream") { + val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000)) + try { + // tests the API, does not actually test data receiving + val test1: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test") + val test2: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2) + val test3: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, + Props[TestActor](), + "test", + StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy = SupervisorStrategy.defaultStrategy) + val test4: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null) + val test5: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null) + val test6: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, + Props[TestActor](), + "test", + StorageLevel.MEMORY_AND_DISK_SER_2, + () => null, + SupervisorStrategy.defaultStrategy) + } finally { + ssc.stop() + } + } +} + +class TestActor extends ActorReceiver { + override def receive: Receive = { + case m: String => store(m) + } +} diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index a72598844907591fafbff2af2b994067b793c2bc..7781aaeed9e0cfd00c85af9d4e6b063ef7407113 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -41,6 +41,11 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala index 506ba8782d3d582caed1e41c45cdfeec438cd053..dd367cd43b8074a27e0b6860e7a0bd02ce4d561b 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala @@ -23,7 +23,7 @@ import akka.util.ByteString import akka.zeromq._ import org.apache.spark.Logging -import org.apache.spark.streaming.receiver.ActorReceiver +import org.apache.spark.streaming.akka.ActorReceiver /** * A receiver to subscribe to ZeroMQ stream. diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 63cd8a2721f0c481c25c0e288fe34b2ae20c8b1e..1784d6e8623ad33b885470ae15174f671b83d6b2 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -20,29 +20,33 @@ package org.apache.spark.streaming.zeromq import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import akka.actor.{Props, SupervisorStrategy} +import akka.actor.{ActorSystem, Props, SupervisorStrategy} import akka.util.ByteString import akka.zeromq.Subscribe -import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.apache.spark.streaming.receiver.ActorSupervisorStrategy object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param ssc StreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to + * @param ssc StreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic * and each frame has sequence of byte thus it needs the converter * (which might be deserializer of bytes) to translate from sequence * of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping (default: + * ActorReceiver.defaultActorSystemCreator) + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) */ def createStream[T: ClassTag]( ssc: StreamingContext, @@ -50,22 +54,31 @@ object ZeroMQUtils { subscribe: Subscribe, bytesToObjects: Seq[ByteString] => Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy + actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, + supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy ): ReceiverInputDStream[T] = { - ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), - "ZeroMQReceiver", storageLevel, supervisorStrategy) + AkkaUtils.createStream( + ssc, + Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), + "ZeroMQReceiver", + storageLevel, + actorSystemCreator, + supervisorStrategy) } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote ZeroMQ publisher - * @param subscribe Topic to subscribe to + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might be * deserializer of bytes) to translate from sequence of sequence of bytes, * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects + * @param storageLevel Storage level to use for storing the received objects + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) */ def createStream[T]( jssc: JavaStreamingContext, @@ -73,25 +86,33 @@ object ZeroMQUtils { subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem], supervisorStrategy: SupervisorStrategy ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn, + storageLevel, + () => actorSystemCreator.call(), + supervisorStrategy) } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might be * deserializer of bytes) to translate from sequence of sequence of bytes, * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel RDD storage level. + * @param storageLevel RDD storage level. */ def createStream[T]( jssc: JavaStreamingContext, @@ -104,14 +125,19 @@ object ZeroMQUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel) + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn, + storageLevel) } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might * be deserializer of bytes) to translate from sequence of sequence of @@ -128,6 +154,10 @@ object ZeroMQUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T](jssc.ssc, publisherUrl, subscribe, fn) + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn) } } diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index 417b91eecb0eedf38214cc5487e1acfcf96e3348..9ff4b41f97d507cfa4c54220603fbae2d207044b 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -17,14 +17,17 @@ package org.apache.spark.streaming.zeromq; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.junit.Test; +import akka.actor.ActorSystem; import akka.actor.SupervisorStrategy; import akka.util.ByteString; import akka.zeromq.Subscribe; +import org.junit.Test; + import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function0; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { @@ -32,19 +35,29 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { public void testZeroMQStream() { String publishUrl = "abc"; Subscribe subscribe = new Subscribe((ByteString)null); - Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() { - @Override - public Iterable<String> call(byte[][] bytes) throws Exception { - return null; - } - }; + Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects(); + Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest(); JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream( ssc, publishUrl, subscribe, bytesToObjects); JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream( ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream( - ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator, SupervisorStrategy.defaultStrategy()); } } + +class BytesToObjects implements Function<byte[][], Iterable<String>> { + @Override + public Iterable<String> call(byte[][] bytes) throws Exception { + return null; + } +} + +class ActorSystemCreatorForTest implements Function0<ActorSystem> { + @Override + public ActorSystem call() { + return null; + } +} diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 35d2e62c68480310113cb172ed23e9bd778f8a59..bac2679cabae58d577f2d02ae0d3853c8d4668e3 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -42,14 +42,22 @@ class ZeroMQStreamSuite extends SparkFunSuite { // tests the API, does not actually test data receiving val test1: ReceiverInputDStream[String] = - ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null) val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null) val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream( ssc, publishUrl, subscribe, bytesToObjects, - StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) + StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy) + val test4: ReceiverInputDStream[String] = + ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy) - // TODO: Actually test data receiving + // TODO: Actually test data receiving. A real test needs the native ZeroMQ library ssc.stop() } } diff --git a/pom.xml b/pom.xml index fca626991324b76f64f3f5849927ea88a4bbd741..43f08efaae86db721b7bbdfe366711ad98029c09 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ <module>external/flume</module> <module>external/flume-sink</module> <module>external/flume-assembly</module> + <module>external/akka</module> <module>external/mqtt</module> <module>external/mqtt-assembly</module> <module>external/zeromq</module> diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 6469201446f0ce333a39cfabddf3d1ca2a594ac6..905fb4cd90377d2f3dc759d096b98183b1b6d823 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -153,6 +153,16 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log_="), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=") + ) ++ Seq( + // SPARK-7799 Add "streaming-akka" project + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$6"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$5"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$4"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$3"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.actorStream") ) ++ Seq( // SPARK-12847 Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"), diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 06e561ae0d89bb5c1795b8593a8045619138aa80..3927b88fb0bf660ce08ad9cb0377caa8f8aae360 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -35,11 +35,11 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile val allProjects@Seq(catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, - sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka, + sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingAkka, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe, testTags) = Seq("catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink", - "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", + "streaming-flume", "streaming-akka", "streaming-kafka", "streaming-mqtt", "streaming-twitter", "streaming-zeromq", "launcher", "unsafe", "test-tags").map(ProjectRef(buildLocation, _)) val optionallyEnabledProjects@Seq(yarn, java8Tests, sparkGangliaLgpl, @@ -232,8 +232,9 @@ object SparkBuild extends PomBuild { /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) + // TODO: remove streamingAkka from this list after 2.0.0 allProjects.filterNot(x => Seq(spark, hive, hiveThriftServer, catalyst, repl, - networkCommon, networkShuffle, networkYarn, unsafe, testTags).contains(x)).foreach { + networkCommon, networkShuffle, networkYarn, unsafe, streamingAkka, testTags).contains(x)).foreach { x => enable(MimaBuild.mimaSettings(sparkHome, x))(x) } @@ -649,7 +650,7 @@ object Unidoc { "-public", "-group", "Core Java API", packageList("api.java", "api.java.function"), "-group", "Spark Streaming", packageList( - "streaming.api.java", "streaming.flume", "streaming.kafka", + "streaming.api.java", "streaming.flume", "streaming.akka", "streaming.kafka", "streaming.mqtt", "streaming.twitter", "streaming.zeromq", "streaming.kinesis" ), "-group", "MLlib", packageList( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b7070dda99dc687433f740427a59573624731626..ec57c05e3b5bb36392e43b70102c4f8c1d63357d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -25,7 +25,6 @@ import scala.collection.mutable.Queue import scala.reflect.ClassTag import scala.util.control.NonFatal -import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} @@ -42,7 +41,7 @@ import org.apache.spark.serializer.SerializationDebugger import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver} +import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils} @@ -295,27 +294,6 @@ class StreamingContext private[streaming] ( } } - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * @param props Props object defining creation of the actor - * @param name Name of the actor - * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2) - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T: ClassTag]( - props: Props, - name: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy - ): ReceiverInputDStream[T] = withNamedScope("actor stream") { - receiverStream(new ActorReceiverSupervisor[T](props, name, storageLevel, supervisorStrategy)) - } - /** * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 00f9d8a9e8817e2c6f3777df5b500bcbc66a71a0..7a25ce54b6ff0d0995ece737ff7fbe156ad7eaee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -24,7 +24,6 @@ import java.util.{List => JList, Map => JMap} import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} @@ -356,69 +355,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf) } - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * @param props Props object defining creation of the actor - * @param name Name of the actor - * @param storageLevel Storage level to use for storing the received objects - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T]( - props: Props, - name: String, - storageLevel: StorageLevel, - supervisorStrategy: SupervisorStrategy - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.actorStream[T](props, name, storageLevel, supervisorStrategy) - } - - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * @param props Props object defining creation of the actor - * @param name Name of the actor - * @param storageLevel Storage level to use for storing the received objects - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T]( - props: Props, - name: String, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.actorStream[T](props, name, storageLevel) - } - - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. - * @param props Props object defining creation of the actor - * @param name Name of the actor - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T]( - props: Props, - name: String - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.actorStream[T](props, name) - } - /** * Create an input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue.