Skip to content
Snippets Groups Projects
Commit 56b9bd19 authored by Prashant Sharma's avatar Prashant Sharma
Browse files

Plug in actor as stream receiver API

parent bb6ab92e
No related branches found
No related tags found
No related merge requests found
---
layout: global
title: Tutorial - Spark streaming, Plugging in a custom receiver.
---
A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
## A quick and naive walk-through
### Write a simple receiver
This starts with implementing [Actor](#References)
Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
{% highlight scala %}
class SocketTextStreamReceiver (host:String,
port:Int,
bytesToString: ByteString => String) extends Actor {
override def preStart = IOManager(context.system).connect(host, port)
def receive = {
case IO.Read(socket, bytes) => context.parent ! Data(bytesToString(bytes))
}
}
{% endhighlight %}
_Please see implementations of NetworkReceiver for more generic NetworkReceivers._
### A sample spark application
* First create a Spark streaming context with master url and batchduration.
{% highlight scala %}
val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
Seconds(batchDuration))
{% endhighlight %}
* Plug-in the actor configuration into the spark streaming context and create a DStream.
{% highlight scala %}
val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
{% endhighlight %}
* Process it.
{% highlight scala %}
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
{% endhighlight %}
* After processing it, stream can be tested using the netcat utility.
$ nc -l localhost 8445
hello world
hello hello
## Multiple homogeneous/heterogeneous receivers.
A DStream union operation is provided for taking union on multiple input streams.
{% highlight scala %}
val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
// Another socket stream receiver
val lines2 = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")
val union = lines.union(lines2)
{% endhighlight %}
Above stream can be easily process as described earlier.
_A more comprehensive example is provided in the spark streaming examples_
## References
1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
package spark.streaming.examples
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala
import spark.streaming.Seconds
import spark.streaming.StreamingContext
import spark.streaming.StreamingContext.toPairDStreamFunctions
import spark.streaming.receivers.Data
case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
/**
* A sample actor as receiver is also simplest. This receiver actor
* goes and subscribe to a typical publisher/feeder actor and receives
* data, thus it is important to have feeder running before this example
* can be run. Please see FileTextStreamFeeder(sample) for feeder of this
* receiver.
*/
class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
extends Actor {
lazy private val remotePublisher = context.actorFor(urlOfPublisher)
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
def receive = {
case msg => context.parent ! Data(msg.asInstanceOf[T])
}
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
}
/**
* A sample word count program demonstrating the use of plugging in
* AkkaActor as Receiver
*/
object AkkaActorWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println(
"Usage: AkkaActorWordCount <master> <batch-duration in seconds>" +
" <remoteAkkaHost> <remoteAkkaPort>" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
val Seq(master, batchDuration, remoteAkkaHost, remoteAkkaPort) = args.toSeq
// Create the context and set the batch size
val ssc = new StreamingContext(master, "AkkaActorWordCount",
Seconds(batchDuration.toLong))
/*
* Following is the use of pluggableActorStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e type of data received and PluggableInputDstream
* should be same.
*
* For example: Both pluggableActorStream and SampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
val lines = ssc.pluggableActorStream[String](
Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
remoteAkkaHost, remoteAkkaPort.toInt))), "SampleReceiver")
//compute wordcount
lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
ssc.start()
}
}
package spark.streaming.examples
import java.util.concurrent.CountDownLatch
import scala.collection.mutable.LinkedList
import scala.io.Source
import akka.actor.{ Actor, ActorRef, actorRef2Scala }
import akka.actor.Props
import spark.util.AkkaUtils
/**
* A feeder to which multiple message receiver (specified by "noOfReceivers")actors
* subscribe and receive file(s)'s text as stream of messages. This is provided
* as a demonstration application for trying out Actor as receiver feature. Please see
* SampleActorReceiver or AkkaActorWordCount example for details about the
* receiver of this feeder.
*/
object FileTextStreamFeeder {
var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
var countdownLatch: CountDownLatch = _
def main(args: Array[String]) = args.toList match {
case host :: port :: noOfReceivers :: fileNames =>
val acs = AkkaUtils.createActorSystem("spark", host, port.toInt)._1
countdownLatch = new CountDownLatch(noOfReceivers.toInt)
val actor = acs.actorOf(Props(new FeederActor), "FeederActor")
countdownLatch.await() //wait for all the receivers to subscribe
for (fileName <- fileNames;line <- Source.fromFile(fileName).getLines) {
actor ! line
}
acs.awaitTermination();
case _ =>
System.err.println("Usage: FileTextStreamFeeder <hostname> <port> <no_of_receivers> <filenames>")
System.exit(1)
}
/**
* Sends the content to every receiver subscribed
*/
class FeederActor extends Actor {
def receive: Receive = {
case SubscribeReceiver(receiverActor: ActorRef) =>
println("received subscribe from %s".format(receiverActor.toString))
receivers = LinkedList(receiverActor) ++ receivers
countdownLatch.countDown()
case UnsubscribeReceiver(receiverActor: ActorRef) =>
println("received unsubscribe from %s".format(receiverActor.toString))
receivers = receivers.dropWhile(x => x eq receiverActor)
case textMessage: String =>
receivers.foreach(_ ! textMessage)
}
}
}
\ No newline at end of file
package spark.streaming package spark.streaming
import akka.actor.Props
import spark.streaming.dstream._ import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext} import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.storage.StorageLevel import spark.storage.StorageLevel
import spark.util.MetadataCleaner import spark.util.MetadataCleaner
import spark.streaming.receivers.ActorReceiver
import spark.streaming.receivers.Settings
import scala.collection.mutable.Queue import scala.collection.mutable.Queue
...@@ -134,6 +139,30 @@ class StreamingContext private ( ...@@ -134,6 +139,30 @@ class StreamingContext private (
protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
/**
* Create an input stream with any arbitrary user implemented network receiver.
* @param receiver Custom implementation of NetworkReceiver
*/
def pluggableNetworkStream[T: ClassManifest](
receiver: NetworkReceiver[T]): DStream[T] = {
val inputStream = new PluggableInputDStream[T](this,
receiver)
graph.addInputStream(inputStream)
inputStream
}
/**
* Create an input stream with any arbitrary user implemented akka actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def pluggableActorStream[T: ClassManifest](
props: Props, name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = {
pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel)))
}
/** /**
* Create an input stream that pulls messages form a Kafka Broker. * Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname. * @param hostname Zookeper hostname.
......
package spark.streaming.dstream
import spark.streaming.StreamingContext
class PluggableInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
def getReceiver(): NetworkReceiver[T] = {
receiver
}
}
package spark.streaming.receivers
import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
import akka.actor.{ actorRef2Scala, ActorRef }
import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
import spark.storage.StorageLevel
import spark.streaming.dstream.NetworkReceiver
import java.util.concurrent.atomic.AtomicInteger
/** A helper with set of defaults for supervisor strategy **/
object ReceiverSupervisorStrategy {
import akka.util.duration._
import akka.actor.SupervisorStrategy._
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
case _: RuntimeException Restart
case _: Exception Escalate
}
}
/**
* Settings for configuring the actor creation or defining supervisor strategy
*/
case class Settings(props: Props,
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy)
/**
* Statistcs for querying the supervisor about state of workers
*/
case class Statistcs(numberOfMsgs: Int,
numberOfWorkers: Int,
numberOfHiccups: Int,
otherInfo: String)
/** Case class to receive data sent by child actors **/
case class Data[T: ClassManifest](data: T)
/**
* Provides Actors as receivers for receiving stream.
*
* As Actors can also be used to receive data from almost any stream source.
* A nice set of abstraction(s) for actors as receivers is already provided for
* a few general cases. It is thus exposed as an API where user may come with
* his own Actor to run as receiver for Spark Streaming input source.
*/
class ActorReceiver[T: ClassManifest](settings: Settings)
extends NetworkReceiver[T] {
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(settings.storageLevel)
protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
private class Supervisor extends Actor {
override val supervisorStrategy = settings.supervisorStrategy
val worker = context.actorOf(settings.props, settings.name)
logInfo("Started receiver worker at:" + worker.path)
val n: AtomicInteger = new AtomicInteger(0)
val hiccups: AtomicInteger = new AtomicInteger(0)
def receive = {
case props: Props =>
val worker = context.actorOf(props)
logInfo("Started receiver worker at:" + worker.path)
sender ! worker
case (props: Props, name: String) =>
val worker = context.actorOf(props, name)
logInfo("Started receiver worker at:" + worker.path)
sender ! worker
case _: PossiblyHarmful => hiccups.incrementAndGet()
case _: Statistcs =>
val workers = context.children
sender ! Statistcs(n.get, workers.size, hiccups.get, workers.mkString("\n"))
case Data(iter: Iterator[_]) => push(iter.asInstanceOf[Iterator[T]])
case Data(msg) =>
blocksGenerator += msg.asInstanceOf[T]
n.incrementAndGet
}
}
protected def push(iter: Iterator[T]) {
pushBlock("block-" + streamId + "-" + System.nanoTime(),
iter, null, settings.storageLevel)
}
protected def onStart() = {
blocksGenerator.start()
supervisor
logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
}
protected def onStop() = {
supervisor ! PoisonPill
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment