From f7d3e309cb76ef208ab51f23c90c5e891fb333a3 Mon Sep 17 00:00:00 2001
From: Prashant Sharma <prashant.s@imaginea.com>
Date: Fri, 8 Feb 2013 16:56:42 +0530
Subject: [PATCH] ZeroMQ stream as receiver

---
 docs/zeroMQ-intro.md                          | 59 +++++++++++++++++++
 project/SparkBuild.scala                      |  1 +
 .../spark/streaming/StreamingContext.scala    | 22 +++++++
 .../streaming/receivers/ZeroMQReceiver.scala  | 33 +++++++++++
 4 files changed, 115 insertions(+)
 create mode 100644 docs/zeroMQ-intro.md
 create mode 100644 streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala

diff --git a/docs/zeroMQ-intro.md b/docs/zeroMQ-intro.md
new file mode 100644
index 0000000000..0365bc08fd
--- /dev/null
+++ b/docs/zeroMQ-intro.md
@@ -0,0 +1,59 @@
+---
+layout: global
+title: ZeroMQ Stream setup guide
+---
+
+## Install ZeroMQ (using JNA)
+
+To work with zeroMQ, some native libraries have to be installed.
+
+* Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
+
+   Typically if you are using ubuntu 12.04, you can do:
+
+    `$ sudo apt-get install libzmq1`
+
+ __To work with akka-zeromq, zmq 2.1 version is supported via [JNA](https://github.com/twall/jna). Incase you want to switch to zeromq 3.0, please install [JZMQ](http://www.zeromq.org/bindings:java) which uses [JNI](http://docs.oracle.com/javase/6/docs/technotes/guides/jni/) and drop in jzmq jar__
+
+## Sample scala code
+
+A publisher is an entity assumed to be outside the spark ecosystem. A sample zeroMQ publisher is provided to try out the sample spark ZeroMQ application.
+
+1. Start the sample publisher.
+
+{% highlight scala %}
+
+
+      val acs: ActorSystem = ActorSystem()
+
+      val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
+
+      pubSocket ! ZMQMessage(Seq(Frame("topic"), Frame("My message".getBytes)))
+
+
+
+{% endhighlight %}
+
+A typical zeromq url looks like `tcp://<ip>:<port>`
+
+It does nothing more than publishing the message on the specified topic and url.
+
+2. Start the spark application by plugging the zeroMQ stream receiver.
+
+{% highlight scala %}
+
+    val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToObjectsIterator)
+
+{% endhighlight %}
+
+bytesToObjectsIterator is going to be a function for decoding the Frame data.
+
+_For example: For decoding into strings using default charset:_
+
+
+{% highlight scala %}
+
+
+    def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
+
+{% endhighlight %}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c6d3cc8b15..5fe85a28c3 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -134,6 +134,7 @@ object SparkBuild extends Build {
       "com.typesafe.akka" % "akka-actor" % "2.0.3",
       "com.typesafe.akka" % "akka-remote" % "2.0.3",
       "com.typesafe.akka" % "akka-slf4j" % "2.0.3",
+      "com.typesafe.akka" % "akka-zeromq" % "2.0.3",
       "it.unimi.dsi" % "fastutil" % "6.4.4",
       "colt" % "colt" % "1.2.0",
       "cc.spray" % "spray-can" % "1.0-M2.1",
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index a9684c5772..8c772aec6e 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -2,12 +2,14 @@ package spark.streaming
 
 import akka.actor.Props
 import akka.actor.SupervisorStrategy
+import akka.zeromq.Subscribe
 
 import spark.streaming.dstream._
 
 import spark.{RDD, Logging, SparkEnv, SparkContext}
 import spark.streaming.receivers.ActorReceiver
 import spark.streaming.receivers.ReceiverSupervisorStrategy
+import spark.streaming.receivers.ZeroMQReceiver
 import spark.storage.StorageLevel
 import spark.util.MetadataCleaner
 import spark.streaming.receivers.ActorReceiver
@@ -174,6 +176,26 @@ class StreamingContext private (
     networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
   }
 
+  /**
+   * ZeroMQ stream receiver
+   * @param publisherUrl Url of remote zeromq publisher
+   * @param zeroMQ topic to subscribe to
+   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+   * 					   of byte thus it needs the converter(which might be deserializer of bytes)
+   * 					   to translate from sequence of sequence of bytes, where sequence refer to a frame
+   *					   and sub sequence refer to its payload.
+   * @param storageLevel RDD storage level. Defaults to memory-only.
+   */
+  def zeroMQStream[T: ClassManifest](publisherUrl:String,
+      subscribe: Subscribe,
+      bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+      storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+      supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
+    
+    actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), 
+        "ZeroMQReceiver", storageLevel, supervisorStrategy)
+  }
+      
   /**
    * Create an input stream that pulls messages form a Kafka Broker.
    * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
new file mode 100644
index 0000000000..5533c3cf1e
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -0,0 +1,33 @@
+package spark.streaming.receivers
+
+import akka.actor.Actor
+import akka.zeromq._
+
+import spark.Logging
+
+/**
+ * A receiver to subscribe to ZeroMQ stream.
+ */
+private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
+  subscribe: Subscribe,
+  bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
+  extends Actor with Receiver with Logging {
+
+  override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
+    Connect(publisherUrl), subscribe)
+
+  def receive: Receive = {
+
+    case Connecting ⇒ logInfo("connecting ...")
+
+    case m: ZMQMessage ⇒
+      logDebug("Received message for:" + m.firstFrameAsString)
+
+      //We ignore first frame for processing as it is the topic
+      val bytes = m.frames.tail.map(_.payload)
+      pushBlock(bytesToObjects(bytes))
+
+    case Closed ⇒ logInfo("received closed ")
+
+  }
+}
-- 
GitLab