Skip to content
Snippets Groups Projects
Commit 3ff97102 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Adding Flume InputDStream

parent c36ca102
No related branches found
No related tags found
No related merge requests found
......@@ -91,7 +91,8 @@ object SparkBuild extends Build {
"org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
"org.scalatest" %% "scalatest" % "1.6.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.9" % "test",
"com.novocode" % "junit-interface" % "0.8" % "test"
"com.novocode" % "junit-interface" % "0.8" % "test",
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile"
),
parallelExecution := false,
/* Workaround for issue #206 (fixed after SBT 0.11.0) */
......
package spark.streaming
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import spark.storage.StorageLevel
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
import java.net.InetSocketAddress
import collection.JavaConversions._
import spark.Utils
import java.nio.ByteBuffer
class FlumeInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = {
new FlumeReceiver(id, host, port, storageLevel)
}
}
/**
* A wrapper class for AvroFlumeEvent's with a custom serialization format.
*
* This is necessary because AvroFlumeEvent uses inner data structures
* which are not serializable.
*/
class SparkFlumeEvent() extends Externalizable {
var event : AvroFlumeEvent = new AvroFlumeEvent()
/* De-serialize from bytes. */
def readExternal(in: ObjectInput) {
val bodyLength = in.readInt()
val bodyBuff = new Array[Byte](bodyLength)
in.read(bodyBuff)
val numHeaders = in.readInt()
val headers = new java.util.HashMap[CharSequence, CharSequence]
for (i <- 0 until numHeaders) {
val keyLength = in.readInt()
val keyBuff = new Array[Byte](keyLength)
in.read(keyBuff)
val key : String = Utils.deserialize(keyBuff)
val valLength = in.readInt()
val valBuff = new Array[Byte](valLength)
in.read(valBuff)
val value : String = Utils.deserialize(valBuff)
headers.put(key, value)
}
event.setBody(ByteBuffer.wrap(bodyBuff))
event.setHeaders(headers)
}
/* Serialize to bytes. */
def writeExternal(out: ObjectOutput) {
val body = event.getBody.array()
out.writeInt(body.length)
out.write(body)
val numHeaders = event.getHeaders.size()
out.writeInt(numHeaders)
for ((k, v) <- event.getHeaders) {
val keyBuff = Utils.serialize(k.toString)
out.writeInt(keyBuff.length)
out.write(keyBuff)
val valBuff = Utils.serialize(v.toString)
out.writeInt(valBuff.length)
out.write(valBuff)
}
}
}
object SparkFlumeEvent {
def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
val event = new SparkFlumeEvent
event.event = in
event
}
}
/** A simple server that implements Flume's Avro protocol. */
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)
Status.OK
}
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event =>
receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
}
/** A NetworkReceiver which listens for events using the
* Flume Avro interface.*/
class FlumeReceiver(
streamId: Int,
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent](streamId) {
lazy val dataHandler = new DataHandler(this, storageLevel)
protected override def onStart() {
val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this));
val server = new NettyServer(responder, new InetSocketAddress(host, port));
dataHandler.start()
server.start()
logInfo("Flume receiver started")
}
protected override def onStop() {
dataHandler.stop()
logInfo("Flume receiver stopped")
}
override def getLocationConstraint = Some(host)
}
\ No newline at end of file
......@@ -8,7 +8,6 @@ import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.storage.StorageLevel
import java.nio.ByteBuffer
import java.util.concurrent.ArrayBlockingQueue
import akka.actor.{Props, Actor}
import akka.pattern.ask
......@@ -63,6 +62,9 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
/** This method will be called to stop receiving data. */
protected def onStop()
/** This method conveys a placement constraint (hostname) for this receiver. */
def getLocationConstraint() : Option[String] = None
/**
* This method starts the receiver. First is accesses all the lazy members to
* materialize them. Then it calls the user-defined onStart() method to start
......@@ -151,6 +153,4 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
tracker ! DeregisterReceiver(streamId, msg)
}
}
}
......@@ -98,7 +98,18 @@ class NetworkInputTracker(
def startReceivers() {
val receivers = networkInputStreams.map(_.createReceiver())
val tempRDD = ssc.sc.makeRDD(receivers, receivers.size)
// We only honor constraints if all receivers have them
val hasLocationConstraints = receivers.map(_.getLocationConstraint().isDefined).reduce(_ && _)
val tempRDD =
if (hasLocationConstraints) {
val receiversWithConstraints = receivers.map(r => (r, Seq(r.getLocationConstraint().toString)))
ssc.sc.makeLocalityConstrainedRDD[NetworkReceiver[_]](receiversWithConstraints)
}
else {
ssc.sc.makeRDD(receivers, receivers.size)
}
val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
if (!iterator.hasNext) {
......
......@@ -31,6 +31,8 @@ class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: S
var blockPushingThread: Thread = null
override def getLocationConstraint = None
def onStart() {
// Open a socket to the target address and keep reading from it
logInfo("Connecting to " + host + ":" + port)
......
......@@ -34,6 +34,8 @@ class SocketReceiver[T: ClassManifest](
lazy protected val dataHandler = new DataHandler(this, storageLevel)
override def getLocationConstraint = None
protected def onStart() {
logInfo("Connecting to " + host + ":" + port)
val socket = new Socket(host, port)
......
......@@ -15,6 +15,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.hadoop.fs.Path
import java.util.UUID
import spark.util.MetadataCleaner
......@@ -166,6 +167,16 @@ class StreamingContext private (
inputStream
}
def flumeStream (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
graph.addInputStream(inputStream)
inputStream
}
def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
......
package spark.streaming.examples
import spark.util.IntParam
import spark.storage.StorageLevel
import spark.streaming._
object FlumeEventCount {
def main(args: Array[String]) {
if (args.length != 4) {
System.err.println(
"Usage: FlumeEventCount <master> <host> <port> <batchMillis>")
System.exit(1)
}
val Array(master, host, IntParam(port), IntParam(batchMillis)) = args
// Create the context and set the batch size
val ssc = new StreamingContext(master, "FlumeEventCount",
Milliseconds(batchMillis))
// Create a flume stream
val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
}
}
package spark.streaming
import java.net.{SocketException, Socket, ServerSocket}
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
......@@ -10,7 +10,14 @@ import spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.apache.flume.source.avro.AvroSourceProtocol
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.{specific, NettyTransceiver}
import org.apache.avro.ipc.specific.SpecificRequestor
import java.nio.ByteBuffer
import collection.JavaConversions._
import java.nio.charset.Charset
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
......@@ -123,6 +130,54 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
ssc.stop()
}
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
val flumeStream = ssc.flumeStream("localhost", 33333, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333));
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver);
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
client.append(event)
Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
}
val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
Thread.sleep(100)
}
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()
val decoder = Charset.forName("UTF-8").newDecoder()
assert(outputBuffer.size === input.length)
for (i <- 0 until outputBuffer.size) {
assert(outputBuffer(i).size === 1)
val str = decoder.decode(outputBuffer(i).head.event.getBody)
assert(str.toString === input(i).toString)
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}
test("file input stream") {
// Create a temporary directory
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment