diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index d5cda347a424f1aa187b383ed7dfe94dcb625d44..39db4be842d1a339a97ae40843d9c8348c461042 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -162,7 +162,8 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", libraryDependencies ++= Seq( - "com.github.sgroschupf" % "zkclient" % "0.1") + "com.github.sgroschupf" % "zkclient" % "0.1", + "junit" % "junit" % "4.8.1") ) ++ assemblySettings ++ extraAssemblySettings def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq( diff --git a/streaming/src/main/scala/spark/streaming/JavaAPISuite.java b/streaming/src/main/scala/spark/streaming/JavaAPISuite.java new file mode 100644 index 0000000000000000000000000000000000000000..bcaaa4fa80892823bbccc9b6750057086eaf78b9 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/JavaAPISuite.java @@ -0,0 +1,64 @@ +package spark.streaming; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import spark.api.java.JavaRDD; +import spark.api.java.function.Function; +import spark.api.java.function.Function2; +import spark.streaming.api.java.JavaStreamingContext; + +import java.io.Serializable; + +// The test suite itself is Serializable so that anonymous Function implementations can be +// serialized, as an alternative to converting these anonymous classes to static inner classes; +// see http://stackoverflow.com/questions/758570/. +public class JavaAPISuite implements Serializable { + private transient JavaStreamingContext sc; + + @Before + public void setUp() { + sc = new JavaStreamingContext("local[2]", "test", new Time(1000)); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port"); + } + + @Test + public void simpleTest() { + sc.textFileStream("/var/log/syslog").print(); + sc.start(); + } + + public static void main(String[] args) { + JavaStreamingContext sc = new JavaStreamingContext("local[2]", "test", new Time(1000)); + + sc.networkTextStream("localhost", 12345).map(new Function<String, Integer>() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }).reduce(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }).foreach(new Function2<JavaRDD<Integer>, Time, Void>() { + @Override + public Void call(JavaRDD<Integer> integerJavaRDD, Time t) throws Exception { + System.out.println("Contents @ " + t.toFormattedString()); + for (int i: integerJavaRDD.collect()) { + System.out.println(i + "\n"); + } + return null; + } + }); + + sc.start(); + } +} diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala new file mode 100644 index 0000000000000000000000000000000000000000..e9391642f85b0f37199de73bf98a29ba29a87066 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -0,0 +1,95 @@ +package spark.streaming.api.java + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import spark.api.java.JavaRDD +import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} +import java.util +import spark.RDD + +class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) { + def print() = dstream.print() + + // TODO move to type-specific implementations + def cache() : JavaDStream[T] = { + dstream.cache() + } + + def count() : JavaDStream[Int] = { + dstream.count() + } + + def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = { + dstream.countByWindow(windowTime, slideTime) + } + + def compute(validTime: Time): JavaRDD[T] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaRDD(rdd) + case None => null + } + } + + def context(): StreamingContext = dstream.context() + + def window(windowTime: Time) = { + dstream.window(windowTime) + } + + def window(windowTime: Time, slideTime: Time): JavaDStream[T] = { + dstream.window(windowTime, slideTime) + } + + def tumble(batchTime: Time): JavaDStream[T] = { + dstream.tumble(batchTime) + } + + def map[R](f: JFunction[T, R]): JavaDStream[R] = { + new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) + } + + def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = { + dstream.filter((x => f(x).booleanValue())) + } + + def glom(): JavaDStream[JList[T]] = { + new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + } + + // TODO: Other map partitions + def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) + } + + def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) + + def reduceByWindow( + reduceFunc: JFunction2[T, T, T], + invReduceFunc: JFunction2[T, T, T], + windowTime: Time, + slideTime: Time): JavaDStream[T] = { + dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime) + } + + def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = { + new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq) + } + + def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) = { + dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) + } + + def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = { + dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) + } +} + +object JavaDStream { + implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] = + new JavaDStream[T](dstream) + +} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala new file mode 100644 index 0000000000000000000000000000000000000000..46f8cffd0be71a62d5bf8eff0cd076909204fe5b --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -0,0 +1,29 @@ +package spark.streaming.api.java + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import dstream.SparkFlumeEvent +import spark.storage.StorageLevel + +class JavaStreamingContext(val ssc: StreamingContext) { + def this(master: String, frameworkName: String, batchDuration: Time) = + this(new StreamingContext(master, frameworkName, batchDuration)) + + def textFileStream(directory: String): JavaDStream[String] = { + ssc.textFileStream(directory) + } + + def networkTextStream(hostname: String, port: Int): JavaDStream[String] = { + ssc.networkTextStream(hostname, port) + } + + def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port, storageLevel) + } + + def start() = ssc.start() + def stop() = ssc.stop() + +}