diff --git a/.gitignore b/.gitignore index 88d7b56181be7607e315b5f01990ed8951336dc9..155e785b01beb809a13c45c40d96f04f4dd6343b 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,5 @@ log/ spark-tests.log streaming-tests.log dependency-reduced-pom.xml +.ensime +.ensime_lucene diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index da82dfd10f290b70c7f8a2244b2065014acee558..9e8eaee756c56d109adf58a8ba3b4087c856ea8e 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -30,6 +30,7 @@ import spark.rdd.MapPartitionsRDD import spark.rdd.MapPartitionsWithIndexRDD import spark.rdd.PipedRDD import spark.rdd.SampledRDD +import spark.rdd.SubtractedRDD import spark.rdd.UnionRDD import spark.rdd.ZippedRDD import spark.storage.StorageLevel @@ -393,6 +394,26 @@ abstract class RDD[T: ClassManifest]( filter(f.isDefinedAt).map(f) } + /** + * Return an RDD with the elements from `this` that are not in `other`. + * + * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting + * RDD will be <= us. + */ + def subtract(other: RDD[T]): RDD[T] = + subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size))) + + /** + * Return an RDD with the elements from `this` that are not in `other`. + */ + def subtract(other: RDD[T], numPartitions: Int): RDD[T] = + subtract(other, new HashPartitioner(numPartitions)) + + /** + * Return an RDD with the elements from `this` that are not in `other`. + */ + def subtract(other: RDD[T], p: Partitioner): RDD[T] = new SubtractedRDD[T](this, other, p) + /** * Reduces the elements of this RDD using the specified commutative and associative binary operator. */ diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index d39767c3b3ce345585cb8c1fe5ef98b5d140058e..f40bb7935fac42310fa8c6d5b18734cba0360425 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -439,7 +439,7 @@ class SparkContext( } /** - * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for + * Broadcast a read-only variable to the cluster, returning a [[spark.broadcast.Broadcast]] object for * reading it in distributed functions. The variable will be sent to each cluster only once. */ def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal) diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index da3cb2cd31395e022274a8010339546cacf949b8..ba00b6a8448f1d28d2bd4d257aca9a62db8b7539 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -6,8 +6,8 @@ import spark.api.java.function.{Function => JFunction} import spark.util.StatCounter import spark.partial.{BoundedDouble, PartialResult} import spark.storage.StorageLevel - import java.lang.Double +import spark.Partitioner class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] { @@ -57,6 +57,27 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav */ def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions)) + /** + * Return an RDD with the elements from `this` that are not in `other`. + * + * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting + * RDD will be <= us. + */ + def subtract(other: JavaDoubleRDD): JavaDoubleRDD = + fromRDD(srdd.subtract(other)) + + /** + * Return an RDD with the elements from `this` that are not in `other`. + */ + def subtract(other: JavaDoubleRDD, numPartitions: Int): JavaDoubleRDD = + fromRDD(srdd.subtract(other, numPartitions)) + + /** + * Return an RDD with the elements from `this` that are not in `other`. + */ + def subtract(other: JavaDoubleRDD, p: Partitioner): JavaDoubleRDD = + fromRDD(srdd.subtract(other, p)) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index df3af3817dc812a24288acf19ec07744e52fd84e..cfbdda88c0417e4dc1130768f374299edeba09ab 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -59,7 +59,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Return a new RDD containing only the elements that satisfy a predicate. */ - def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] = + def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue())) /** @@ -102,7 +102,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ - def combineByKey[C](createCombiner: Function[V, C], + def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner): JavaPairRDD[K, C] = { @@ -181,6 +181,27 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] = fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions))) + /** + * Return an RDD with the elements from `this` that are not in `other`. + * + * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting + * RDD will be <= us. + */ + def subtract(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] = + fromRDD(rdd.subtract(other)) + + /** + * Return an RDD with the elements from `this` that are not in `other`. + */ + def subtract(other: JavaPairRDD[K, V], numPartitions: Int): JavaPairRDD[K, V] = + fromRDD(rdd.subtract(other, numPartitions)) + + /** + * Return an RDD with the elements from `this` that are not in `other`. + */ + def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] = + fromRDD(rdd.subtract(other, p)) + /** * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine` * is true, Spark will group values of the same key together on the map side before the @@ -309,7 +330,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ - def mapValues[U](f: Function[V, U]): JavaPairRDD[K, U] = { + def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] fromRDD(rdd.mapValues(f)) diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index 3ccd6f055ebef53804e0220c82e8ff69e054c325..301688889898e169e52b75951c159fb1b7a3159d 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -55,6 +55,26 @@ JavaRDDLike[T, JavaRDD[T]] { */ def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd)) + /** + * Return an RDD with the elements from `this` that are not in `other`. + * + * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting + * RDD will be <= us. + */ + def subtract(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.subtract(other)) + + /** + * Return an RDD with the elements from `this` that are not in `other`. + */ + def subtract(other: JavaRDD[T], numPartitions: Int): JavaRDD[T] = + wrapRDD(rdd.subtract(other, numPartitions)) + + /** + * Return an RDD with the elements from `this` that are not in `other`. + */ + def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = + wrapRDD(rdd.subtract(other, p)) + } object JavaRDD { diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 90b45cf875ea7fb42b8bde2ffbade7d37db1dff4..d884529d7a6f552227deb3989912efeff13cd5f2 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -12,7 +12,7 @@ import spark.storage.StorageLevel import com.google.common.base.Optional -trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] { +trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def wrapRDD(rdd: RDD[T]): This implicit val classManifest: ClassManifest[T] @@ -82,12 +82,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround } /** - * Part of the workaround for SPARK-668; called in PairFlatMapWorkaround.java. + * Return a new RDD by first applying a function to all elements of this + * RDD, and then flattening the results. */ - private[spark] def doFlatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = { + def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType()) } @@ -110,8 +111,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]): - JavaPairRDD[K, V] = { + def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): + JavaPairRDD[K2, V2] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) } diff --git a/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java b/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java deleted file mode 100644 index 68b6fd6622742148761363045c6a06d0e8afeb74..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/api/java/PairFlatMapWorkaround.java +++ /dev/null @@ -1,20 +0,0 @@ -package spark.api.java; - -import spark.api.java.JavaPairRDD; -import spark.api.java.JavaRDDLike; -import spark.api.java.function.PairFlatMapFunction; - -import java.io.Serializable; - -/** - * Workaround for SPARK-668. - */ -class PairFlatMapWorkaround<T> implements Serializable { - /** - * Return a new RDD by first applying a function to all elements of this - * RDD, and then flattening the results. - */ - public <K, V> JavaPairRDD<K, V> flatMap(PairFlatMapFunction<T, K, V> f) { - return ((JavaRDDLike <T, ?>) this).doFlatMap(f); - } -} diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 1cd68a2aa61ddcc874fd0245dd5b0eb52f4f3479..b7f167425f5a70b83a4924f1f9bf804487893fe1 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -33,6 +33,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor val waitingApps = new ArrayBuffer[ApplicationInfo] val completedApps = new ArrayBuffer[ApplicationInfo] + var firstApp: Option[ApplicationInfo] = None + val masterPublicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else ip @@ -167,7 +169,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(_.coresFree).reverse + .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) @@ -190,7 +192,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } } else { // Pack each app into as few nodes as possible until we've assigned all its cores - for (worker <- workers if worker.coresFree > 0) { + for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { if (canUse(app, worker)) { val coresToUse = math.min(worker.coresFree, app.coresLeft) @@ -245,6 +247,13 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor idToApp(app.id) = app actorToApp(driver) = app addressToApp(driver.path.address) = app + if (firstApp == None) { + firstApp = Some(app) + } + val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray + if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) { + logWarning("Could not find any workers with enough memory for " + firstApp.get.id) + } return app } diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala index cd5b7d57f32f58c3c1a5e20e1dcc011d0650a7e1..d1451bc2124c581eff01dfae5277612ea5c995c7 100644 --- a/core/src/main/scala/spark/network/Connection.scala +++ b/core/src/main/scala/spark/network/Connection.scala @@ -198,7 +198,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { outbox.synchronized { outbox.addMessage(message) if (channel.isConnected) { - changeConnectionKeyInterest(SelectionKey.OP_WRITE) + changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ) } } } @@ -219,7 +219,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { def finishConnect() { try { channel.finishConnect - changeConnectionKeyInterest(SelectionKey.OP_WRITE) + changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ) logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending") } catch { case e: Exception => { @@ -239,8 +239,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { currentBuffers ++= chunk.buffers } case None => { - changeConnectionKeyInterest(0) - /*key.interestOps(0)*/ + changeConnectionKeyInterest(SelectionKey.OP_READ) return } } @@ -267,6 +266,23 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { } } } + + override def read() { + // We don't expect the other side to send anything; so, we just read to detect an error or EOF. + try { + val length = channel.read(ByteBuffer.allocate(1)) + if (length == -1) { // EOF + close() + } else if (length > 0) { + logWarning("Unexpected data read from SendingConnection to " + remoteConnectionManagerId) + } + } catch { + case e: Exception => + logError("Exception while reading SendingConnection to " + remoteConnectionManagerId, e) + callOnExceptionCallback(e) + close() + } + } } diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index 8139a2a40c66fc9a2f0f24adc09df30ed2df8c5f..78097502bca48d207a65563718079c638490d987 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -15,7 +15,7 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils -import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext} +import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} /** @@ -42,7 +42,7 @@ class HadoopRDD[K, V]( keyClass: Class[K], valueClass: Class[V], minSplits: Int) - extends RDD[(K, V)](sc, Nil) { + extends RDD[(K, V)](sc, Nil) with Logging { // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) @@ -71,7 +71,7 @@ class HadoopRDD[K, V]( reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. - context.addOnCompleteCallback(() => reader.close()) + context.addOnCompleteCallback{ () => close() } val key: K = reader.createKey() val value: V = reader.createValue() @@ -88,9 +88,6 @@ class HadoopRDD[K, V]( } gotNext = true } - if (finished) { - reader.close() - } !finished } @@ -104,6 +101,14 @@ class HadoopRDD[K, V]( gotNext = false (key, value) } + + private def close() { + try { + reader.close() + } catch { + case e: Exception => logWarning("Exception in RecordReader.close()", e) + } + } } override def getPreferredLocations(split: Partition): Seq[String] = { diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index ebd4c3f0e2d863ddfdf629b5666add02f182ee55..df2361025c75327837c4e13184e762261e4b7509 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -7,7 +7,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ -import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext} +import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} private[spark] @@ -26,7 +26,8 @@ class NewHadoopRDD[K, V]( valueClass: Class[V], @transient conf: Configuration) extends RDD[(K, V)](sc, Nil) - with HadoopMapReduceUtil { + with HadoopMapReduceUtil + with Logging { // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) @@ -61,7 +62,7 @@ class NewHadoopRDD[K, V]( reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) // Register an on-task-completion callback to close the input stream. - context.addOnCompleteCallback(() => reader.close()) + context.addOnCompleteCallback(() => close()) var havePair = false var finished = false @@ -81,6 +82,14 @@ class NewHadoopRDD[K, V]( havePair = false return (reader.getCurrentKey, reader.getCurrentValue) } + + private def close() { + try { + reader.close() + } catch { + case e: Exception => logWarning("Exception in RecordReader.close()", e) + } + } } override def getPreferredLocations(split: Partition): Seq[String] = { diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala new file mode 100644 index 0000000000000000000000000000000000000000..daf9cc993cf42e9e963e986d73cdad0d6d708059 --- /dev/null +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -0,0 +1,108 @@ +package spark.rdd + +import java.util.{HashSet => JHashSet} +import scala.collection.JavaConversions._ +import spark.RDD +import spark.Partitioner +import spark.Dependency +import spark.TaskContext +import spark.Partition +import spark.SparkEnv +import spark.ShuffleDependency +import spark.OneToOneDependency + +/** + * An optimized version of cogroup for set difference/subtraction. + * + * It is possible to implement this operation with just `cogroup`, but + * that is less efficient because all of the entries from `rdd2`, for + * both matching and non-matching values in `rdd1`, are kept in the + * JHashMap until the end. + * + * With this implementation, only the entries from `rdd1` are kept in-memory, + * and the entries from `rdd2` are essentially streamed, as we only need to + * touch each once to decide if the value needs to be removed. + * + * This is particularly helpful when `rdd1` is much smaller than `rdd2`, as + * you can use `rdd1`'s partitioner/partition size and not worry about running + * out of memory because of the size of `rdd2`. + */ +private[spark] class SubtractedRDD[T: ClassManifest]( + @transient var rdd1: RDD[T], + @transient var rdd2: RDD[T], + part: Partitioner) extends RDD[T](rdd1.context, Nil) { + + override def getDependencies: Seq[Dependency[_]] = { + Seq(rdd1, rdd2).map { rdd => + if (rdd.partitioner == Some(part)) { + logInfo("Adding one-to-one dependency with " + rdd) + new OneToOneDependency(rdd) + } else { + logInfo("Adding shuffle dependency with " + rdd) + val mapSideCombinedRDD = rdd.mapPartitions(i => { + val set = new JHashSet[T]() + while (i.hasNext) { + set.add(i.next) + } + set.iterator + }, true) + // ShuffleDependency requires a tuple (k, v), which it will partition by k. + // We need this to partition to map to the same place as the k for + // OneToOneDependency, which means: + // - for already-tupled RDD[(A, B)], into getPartition(a) + // - for non-tupled RDD[C], into getPartition(c) + val part2 = new Partitioner() { + def numPartitions = part.numPartitions + def getPartition(key: Any) = key match { + case (k, v) => part.getPartition(k) + case k => part.getPartition(k) + } + } + new ShuffleDependency(mapSideCombinedRDD.map((_, null)), part2) + } + } + } + + override def getPartitions: Array[Partition] = { + val array = new Array[Partition](part.numPartitions) + for (i <- 0 until array.size) { + // Each CoGroupPartition will depend on rdd1 and rdd2 + array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) => + dependencies(j) match { + case s: ShuffleDependency[_, _] => + new ShuffleCoGroupSplitDep(s.shuffleId) + case _ => + new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)) + } + }.toList) + } + array + } + + override val partitioner = Some(part) + + override def compute(p: Partition, context: TaskContext): Iterator[T] = { + val partition = p.asInstanceOf[CoGroupPartition] + val set = new JHashSet[T] + def integrate(dep: CoGroupSplitDep, op: T => Unit) = dep match { + case NarrowCoGroupSplitDep(rdd, _, itsSplit) => + for (k <- rdd.iterator(itsSplit, context)) + op(k.asInstanceOf[T]) + case ShuffleCoGroupSplitDep(shuffleId) => + for ((k, _) <- SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index)) + op(k.asInstanceOf[T]) + } + // the first dep is rdd1; add all keys to the set + integrate(partition.deps(0), set.add) + // the second dep is rdd2; remove all of its keys from the set + integrate(partition.deps(1), set.remove) + set.iterator + } + + override def clearDependencies() { + super.clearDependencies() + rdd1 = null + rdd2 = null + } + +} \ No newline at end of file diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 1e4fbdb8742fdac8d33edb6b1e9ec1b3aaaea4d4..d9c2f9517be50ecd6e9370b5f55a4a3032dae6ab 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -11,6 +11,7 @@ import spark.TaskState.TaskState import spark.scheduler._ import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong +import java.util.{TimerTask, Timer} /** * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call @@ -22,6 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // How often to check for speculative tasks val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong + // Threshold above which we warn user initial TaskSet may be starved + val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong val activeTaskSets = new HashMap[String, TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] @@ -30,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val taskIdToExecutorId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] + var hasReceivedTask = false + var hasLaunchedTask = false + val starvationTimer = new Timer(true) + // Incrementing Mesos task IDs val nextTaskId = new AtomicLong(0) @@ -94,6 +101,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext) activeTaskSets(taskSet.id) = manager activeTaskSetsQueue += manager taskSetTaskIds(taskSet.id) = new HashSet[Long]() + + if (hasReceivedTask == false) { + starvationTimer.scheduleAtFixedRate(new TimerTask() { + override def run() { + if (!hasLaunchedTask) { + logWarning("Initial job has not accepted any resources; " + + "check your cluster UI to ensure that workers are registered") + } else { + this.cancel() + } + } + }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) + } + hasReceivedTask = true; } backend.reviveOffers() } @@ -150,6 +171,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } while (launchedTask) } + if (tasks.size > 0) { + hasLaunchedTask = true + } return tasks } } @@ -235,7 +259,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } override def defaultParallelism() = backend.defaultParallelism() - + // Check for speculatable tasks in all our active jobs. def checkSpeculatableTasks() { var shouldRevive = false diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala index a342d378ffab263f8dc23b58ec7e7f1f204bd045..dafa90671214b4d803f2775534ebee9f5325cc70 100644 --- a/core/src/main/scala/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/spark/util/MetadataCleaner.scala @@ -38,7 +38,7 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging object MetadataCleaner { - def getDelaySeconds = System.getProperty("spark.cleaner.delay", "-1").toInt - def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.delay", delay.toString) } + def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt + def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString) } } diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala index 03559751bc46bea21e8d455323459cc91d73ea9c..835822edb2300969ea40497e9e954f85921ffd5e 100644 --- a/core/src/main/scala/spark/util/Vector.scala +++ b/core/src/main/scala/spark/util/Vector.scala @@ -11,12 +11,16 @@ class Vector(val elements: Array[Double]) extends Serializable { return Vector(length, i => this(i) + other(i)) } + def add(other: Vector) = this + other + def - (other: Vector): Vector = { if (length != other.length) throw new IllegalArgumentException("Vectors of different length") return Vector(length, i => this(i) - other(i)) } + def subtract(other: Vector) = this - other + def dot(other: Vector): Double = { if (length != other.length) throw new IllegalArgumentException("Vectors of different length") @@ -61,10 +65,16 @@ class Vector(val elements: Array[Double]) extends Serializable { this } + def addInPlace(other: Vector) = this +=other + def * (scale: Double): Vector = Vector(length, i => this(i) * scale) + def multiply (d: Double) = this * d + def / (d: Double): Vector = this * (1 / d) + def divide (d: Double) = this / d + def unary_- = this * -1 def sum = elements.reduceLeft(_ + _) diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 0e2585daa434cdad2c8deb14bd088072a69dfe31..caa4ba3a3705af5587099951c1914a03663b5ea8 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -217,6 +217,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(grouped.collect.size === 1) } } + + test("recover from node failures with replication") { + import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity} + DistributedSuite.amMaster = true + // Using more than two nodes so we don't have a symmetric communication pattern and might + // cache a partially correct list of peers. + sc = new SparkContext("local-cluster[3,1,512]", "test") + for (i <- 1 to 3) { + val data = sc.parallelize(Seq(true, false, false, false), 4) + data.persist(StorageLevel.MEMORY_ONLY_2) + + assert(data.count === 4) + assert(data.map(markNodeIfIdentity).collect.size === 4) + assert(data.map(failOnMarkedIdentity).collect.size === 4) + + // Create a new replicated RDD to make sure that cached peer information doesn't cause + // problems. + val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2) + assert(data2.count === 2) + } + } } object DistributedSuite { diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 92c3f6741668b510af22020d8d34866cb5f11a7c..77e0eab8299b9a1278a69749fcae8e01328730e5 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -234,6 +234,32 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(rdd.keys.collect().toList === List(1, 2)) assert(rdd.values.collect().toList === List("a", "b")) } + + test("subtract") { + sc = new SparkContext("local", "test") + val a = sc.parallelize(Array(1, 2, 3), 2) + val b = sc.parallelize(Array(2, 3, 4), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set(1)) + assert(c.partitions.size === a.partitions.size) + } + + test("subtract with narrow dependency") { + sc = new SparkContext("local", "test") + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList) + // more splits/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + assert(c.partitioner.get === p) + } } object ShuffleSuite { diff --git a/docs/configuration.md b/docs/configuration.md index f1ca77aa7826a9da39153d6ffe4ac74198b65a0f..04eb6daaa5d016a6ecdaa51575011882bb328bb1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -183,7 +183,7 @@ Apart from these, the following properties are also available, and may be useful </tr> <tr> <td>spark.broadcast.factory</td> - <td>spark.broadcast. HttpBroadcastFactory</td> + <td>spark.broadcast.HttpBroadcastFactory</td> <td> Which broadcast implementation to use. </td> @@ -244,10 +244,10 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td>spark.cleaner.delay</td> + <td>spark.cleaner.ttl</td> <td>(disable)</td> <td> - Duration (minutes) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). + Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming applications). Note that any RDD that persists in memory for more than this duration will be cleared as well. diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 71e1bd4aab20fa6cf8949241b89b40d0633c428f..ded43e67cd707c411d4fefb7da80d9b9cc2a7953 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -207,7 +207,7 @@ ssc.stop() {% endhighlight %} # Example -A simple example to start off is the [NetworkWordCount](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala). This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in `<Spark repo>/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala` . +A simple example to start off is the [NetworkWordCount](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala). This example counts the words received from a network server every second. Given below is the relevant sections of the source code. You can find the full source code in `<Spark repo>/streaming/src/main/scala/spark/streaming/examples/NetworkWordCount.scala` . {% highlight scala %} import spark.streaming.{Seconds, StreamingContext} @@ -216,7 +216,7 @@ import spark.streaming.StreamingContext._ // Create the context and set up a network input stream to receive from a host:port val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) -val lines = ssc.networkTextStream(args(1), args(2).toInt) +val lines = ssc.socketTextStream(args(1), args(2).toInt) // Split the lines into words, count them, and print some of the counts on the master val words = lines.flatMap(_.split(" ")) @@ -227,6 +227,8 @@ wordCounts.print() ssc.start() {% endhighlight %} +The `socketTextStream` returns a DStream of lines received from a TCP socket-based source. The `lines` DStream is _transformed_ into a DStream using the `flatMap` operation, where each line is split into words. This `words` DStream is then mapped to a DStream of `(word, 1)` pairs, which is finally reduced to get the word counts. `wordCounts.print()` will print 10 of the counts generated every second. + To run this example on your local machine, you need to first run a Netcat server by using {% highlight bash %} @@ -335,7 +337,7 @@ For a Spark Streaming application running on a cluster to be stable, the process A good approach to figure out the right batch size for your application is to test it with a conservative batch size (say, 5-10 seconds) and a low data rate. To verify whether the system is able to keep up with data rate, you can check the value of the end-to-end delay experienced by each processed batch (in the Spark master logs, find the line having the phrase "Total delay"). If the delay is maintained to be less than the batch size, then system is stable. Otherwise, if the delay is continuously increasing, it means that the system is unable to keep up and it therefore unstable. Once you have an idea of a stable configuration, you can try increasing the data rate and/or reducing the batch size. Note that momentary increase in the delay due to temporary data rate increases maybe fine as long as the delay reduces back to a low value (i.e., less than batch size). ## 24/7 Operation -By default, Spark does not forget any of the metadata (RDDs generated, stages processed, etc.). But for a Spark Streaming application to operate 24/7, it is necessary for Spark to do periodic cleanup of it metadata. This can be enabled by setting the Java system property `spark.cleaner.delay` to the number of seconds you want any metadata to persist. For example, setting `spark.cleaner.delay` to 600 would cause Spark periodically cleanup all metadata and persisted RDDs that are older than 10 minutes. Note, that this property needs to be set before the SparkContext is created. +By default, Spark does not forget any of the metadata (RDDs generated, stages processed, etc.). But for a Spark Streaming application to operate 24/7, it is necessary for Spark to do periodic cleanup of it metadata. This can be enabled by setting the Java system property `spark.cleaner.ttl` to the number of seconds you want any metadata to persist. For example, setting `spark.cleaner.ttl` to 600 would cause Spark periodically cleanup all metadata and persisted RDDs that are older than 10 minutes. Note, that this property needs to be set before the SparkContext is created. This value is closely tied with any window operation that is being used. Any window operation would require the input data to be persisted in memory for at least the duration of the window. Hence it is necessary to set the delay to at least the value of the largest window operation used in the Spark Streaming application. If this delay is set too low, the application will throw an exception saying so. @@ -347,15 +349,19 @@ Tuning the memory usage and GC behavior of Spark applications have been discusse * **Concurrent garbage collector**: Using the concurrent mark-and-sweep GC further minimizes the variability of GC pauses. Even though concurrent GC is known to reduce the overall processing throughput of the system, its use is still recommended to achieve more consistent batch processing times. # Fault-tolerance Properties -There are two aspects to fault-tolerance - failure of a worker node and that of a driver node. In this section, we are going to discuss the fault-tolerance behavior and the semantics of the processed data. +In this section, we are going to discuss the behavior of Spark Streaming application in the event of a node failure. To understand this, let us remember the basic fault-tolerance properties of Spark's RDDs. + + 1. An RDD is an immutable, and deterministically re-computable, distributed dataset. Each RDD remembers the lineage of deterministic operations that were used on a fault-tolerant input dataset to create it. + 1. If any partition of an RDD is lost due to a worker node failure, then that partition can be re-computed from the original fault-tolerant dataset using the lineage of operations. + +Since all data transformations in Spark Streaming are based on RDD operations, as long as the input dataset is present, all intermediate data can recomputed. Keeping these properties in mind, we are going to discuss the failure semantics in more detail. ## Failure of a Worker Node -In case of the worker node failure, none of the processed data will be lost because -1. All the input data is fault-tolerant (either the data is on HDFS, or it replicated Spark Streaming if received from the network) -1. All intermediate data is expressed as RDDs with their lineage to the input data, which allows Spark to recompute any part of the intermediate data is lost to worker node failure. +There are two failure behaviors based on which input sources are used. -If the worker node where a network data receiver is running fails, then the receiver will be restarted on a different node and it will continue to receive data. However, data that was accepted by the receiver but not yet replicated to other Spark nodes may be lost, which is a fraction of a second of data. +1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure. +1. _Using any input source that receives data through a network_ - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreach`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. @@ -372,11 +378,19 @@ All this is periodically saved in the file `<checkpoint directory>/graph` where val ssc = new StreamingContext(checkpointDirectory) {% endhighlight %} -Calling `ssc.start()` on this new context will restart the receivers and the stream computations. +On calling `ssc.start()` on this new context, the following steps are taken by the system + +1. Schedule the transformations and output operations for all the time steps between the time when the driver failed and when it was restarted. This is also done for those time steps that were scheduled but not processed due to the failure. This will make the system recompute all the intermediate data from the checkpointed RDD files, etc. +1. Restart the network receivers, if any, and continue receiving new data. + +In the current _alpha_ release, there are two different failure behaviors based on which input sources are used. + +1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure. +1. _Using any input source that receives data through a network_ - As aforesaid, the received input data is replicated in memory to multiple nodes. Since, all the data in the Spark worker's memory is lost when the Spark driver fails, the past input data will not be accessible and driver recovers. Hence, if stateful and window-based operations are used (like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state will not be recovered completely. -In case of stateful operations (that is, `updateStateByKey` and `reduceByKeyAndWindow` with inverse function), the intermediate data at the time of failure also needs to be recomputed.This requires two things - (i) the RDD checkpoints and (ii) the data received since the checkpoints. In the current _alpha_ release, the input data received from the network is not saved durably across driver failures (the data is only replicated in memory of the worker processes and gets lost when the driver fails). Only with file input streams (where the data is already durably stored) is the recovery from driver failure complete and all intermediate data is recomputed. In a future release, this will be true for all input streams. Note that for non-stateful operations, with _all_ input streams, the system will recover and continue receiving and processing new data. +In future releases, this behaviour will be fixed for all input sources, that is, all data will be recovered irrespective of which input sources are used. Note that for non-stateful transformations like `map`, `count`, and `reduceByKey`, with _all_ input streams, the system, upon restarting, will continue to receive and process new data. -To understand the behavior of the system under driver failure, lets consider what will happen with a file input stream Specifically, in the case of the file input stream, it will correctly identify new files that were created while the driver was down and process them in the same way as it would have if the driver had not failed. To explain further in the case of file input stream, we shall use an example. Lets say, files are being generated every second, and a Spark Streaming program reads every new file and output the number of lines in the file. This is what the sequence of outputs would be with and without a driver failure. +To better understand the behavior of the system under driver failure with a HDFS source, lets consider what will happen with a file input stream Specifically, in the case of the file input stream, it will correctly identify new files that were created while the driver was down and process them in the same way as it would have if the driver had not failed. To explain further in the case of file input stream, we shall use an example. Lets say, files are being generated every second, and a Spark Streaming program reads every new file and output the number of lines in the file. This is what the sequence of outputs would be with and without a driver failure. <table class="table"> <!-- Results table headers --> @@ -450,6 +464,55 @@ To understand the behavior of the system under driver failure, lets consider wha If the driver had crashed in the middle of the processing of time 3, then it will process time 3 and output 30 after recovery. +# Java API + +Similar to [Spark's Java API](java-programming-guide.html), we also provide a Java API for Spark Streaming which allows all its features to be accessible from a Java program. This is defined in [spark.streaming.api.java] (api/streaming/index.html#spark.streaming.api.java.package) package and includes [JavaStreamingContext](api/streaming/index.html#spark.streaming.api.java.JavaStreamingContext) and [JavaDStream](api/streaming/index.html#spark.streaming.api.java.JavaDStream) classes that provide the same methods as their Scala counterparts, but take Java functions (that is, Function, and Function2) and return Java data and collection types. Some of the key points to note are: + +1. Functions for transformations must be implemented as subclasses of [Function](api/core/index.html#spark.api.java.function.Function) and [Function2](api/core/index.html#spark.api.java.function.Function2) +1. Unlike the Scala API, the Java API handles DStreams for key-value pairs using a separate [JavaPairDStream](api/streaming/index.html#spark.streaming.api.java.JavaPairDStream) class(similar to [JavaRDD and JavaPairRDD](java-programming-guide.html#rdd-classes). DStream functions like `map` and `filter` are implemented separately by JavaDStreams and JavaPairDStream to return DStreams of appropriate types. + +Spark's [Java Programming Guide](java-programming-guide.html) gives more ideas about using the Java API. To extends the ideas presented for the RDDs to DStreams, we present parts of the Java version of the same NetworkWordCount example presented above. The full source code is given at `<spark repo>/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java` + +The streaming context and the socket stream from input source is started by using a `JavaStreamingContext`, that has the same parameters and provides the same input streams as its Scala counterpart. + +{% highlight java %} +JavaStreamingContext ssc = new JavaStreamingContext(mesosUrl, "NetworkWordCount", Seconds(1)); +JavaDStream<String> lines = ssc.socketTextStream(ip, port); +{% endhighlight %} + + +Then the `lines` are split into words by using the `flatMap` function and [FlatMapFunction](api/core/index.html#spark.api.java.function.FlatMapFunction). + +{% highlight java %} +JavaDStream<String> words = lines.flatMap( + new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(x.split(" ")); + } + }); +{% endhighlight %} + +The `words` is then mapped to a [JavaPairDStream](api/streaming/index.html#spark.streaming.api.java.JavaPairDStream) of `(word, 1)` pairs using `map` and [PairFunction](api/core/index.html#spark.api.java.function.PairFunction). This is reduced by using `reduceByKey` and [Function2](api/core/index.html#spark.api.java.function.Function2). + +{% highlight java %} +JavaPairDStream<String, Integer> wordCounts = words.map( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey( + new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); +{% endhighlight %} + + + # Where to Go from Here -* Documentation - [Scala and Java](api/streaming/index.html) +* Documentation - [Scala](api/streaming/index.html#spark.streaming.package) and [Java](api/streaming/index.html#spark.streaming.api.java.package) * More examples - [Scala](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples) and [Java](https://github.com/mesos/spark/tree/master/examples/src/main/java/spark/streaming/examples) diff --git a/examples/pom.xml b/examples/pom.xml index f6125444e2a46e326482a6e91e860f37b37f2a8a..7d975875fac3afe8e733c3549bcc4437d0f236f6 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -22,7 +22,7 @@ <dependency> <groupId>com.twitter</groupId> <artifactId>algebird-core_2.9.2</artifactId> - <version>0.1.9</version> + <version>0.1.8</version> </dependency> <dependency> <groupId>org.scalatest</groupId> diff --git a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java index 07342beb02ea67648e79c1737fdf70940448c6d0..0e9eadd01b7111b7c98258a2d3e252bbb09c7618 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java @@ -23,7 +23,7 @@ import spark.streaming.api.java.JavaStreamingContext; */ public class JavaNetworkWordCount { public static void main(String[] args) { - if (args.length < 2) { + if (args.length < 3) { System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + "In local mode, <master> should be 'local[n]' with n > 1"); System.exit(1); diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala index 346151c147483974279fcaa2276e8c350674af71..76293fbb96bf7be6b2a61af1b597a3e4b2d6ddca 100644 --- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala @@ -131,8 +131,7 @@ object ActorWordCount { val Seq(master, host, port) = args.toSeq // Create the context and set the batch size - val ssc = new StreamingContext(master, "ActorWordCount", - Seconds(10)) + val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2)) /* * Following is the use of actorStream to plug in custom actor as receiver diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala index 7ff70ae2e57ebf95da882f281809a8898a141c82..5ac6d19b349da7dd9e4be4d019db974e63107038 100644 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala @@ -16,7 +16,7 @@ import spark.streaming.StreamingContext._ */ object NetworkWordCount { def main(args: Array[String]) { - if (args.length < 2) { + if (args.length < 3) { System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + "In local mode, <master> should be 'local[n]' with n > 1") System.exit(1) diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala index 2eec777c54e8d21da0a686c3768202ba1c0a1b93..66e709b7a333836fd1950ac3da50d8b87194fea1 100644 --- a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala @@ -37,7 +37,7 @@ object RawNetworkGrep { RawTextHelper.warmUp(ssc.sc) val rawStreams = (1 to numStreams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray + ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray val union = ssc.union(rawStreams) union.filter(_.contains("the")).count().foreach(r => println("Grep count: " + r.collect().mkString)) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5e7c3b5e3aca2b145748c956ac4b3cf5a87a8b33..22bdc93602ea55992e64c91cc00e691d7d1ea8f2 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -155,7 +155,7 @@ object SparkBuild extends Build { def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", - libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.9") + libraryDependencies ++= Seq("com.twitter" % "algebird-core_2.9.2" % "0.1.8") ) def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") diff --git a/run b/run index 2d8a737e01f57654fe7aec76e4948a550eefe31e..6b2d84d48dbf034613c34c0b089fe4f380a450ba 100755 --- a/run +++ b/run @@ -25,6 +25,26 @@ if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" SPARK_JAVA_OPTS=$SPARK_DAEMON_JAVA_OPTS # Empty by default fi + +# Add java opts for master, worker, executor. The opts maybe null +case "$1" in + 'spark.deploy.master.Master') + SPARK_JAVA_OPTS+=" $SPARK_MASTER_OPTS" + ;; + 'spark.deploy.worker.Worker') + SPARK_JAVA_OPTS+=" $SPARK_WORKER_OPTS" + ;; + 'spark.executor.StandaloneExecutorBackend') + SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" + ;; + 'spark.executor.MesosExecutorBackend') + SPARK_JAVA_OPTS+=" $SPARK_EXECUTOR_OPTS" + ;; + 'spark.repl.Main') + SPARK_JAVA_OPTS+=" $SPARK_REPL_OPTS" + ;; +esac + if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then if [ `command -v scala` ]; then RUNNER="scala" diff --git a/streaming/pom.xml b/streaming/pom.xml index d78c39da0d1a60a1bdbf928c3981392f1bbc11c1..92b17fc3af6f4c711515c491b3f5634a173fa489 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -20,6 +20,17 @@ <id>lib</id> <url>file://${project.basedir}/lib</url> </repository> + <repository> + <id>akka-repo</id> + <name>Akka Repository</name> + <url>http://repo.akka.io/releases</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> </repositories> <dependencies> @@ -53,11 +64,10 @@ <version>3.0.3</version> </dependency> <dependency> - <groupId>org.twitter4j</groupId> - <artifactId>twitter4j-core</artifactId> - <version>3.0.3</version> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-zeromq</artifactId> + <version>2.0.3</version> </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index 64972fd5cd79f81be8624a378fb6f29e7ee31087..b159d26c02b2d4383253110d1d589fc2a6b4261b 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -141,7 +141,7 @@ class NetworkInputTracker( } // Run the dummy Spark job to ensure that all slaves have registered. // This avoids all the receivers to be scheduled on the same node. - //ssc.sparkContext.makeRDD(1 to 100, 100).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() // Distribute the receivers and start them ssc.sparkContext.runJob(tempRDD, startReceiver) diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index d0430b3f3eef8867c33ea9862df694958224dce8..25c67b279b7d2feb38a5b5d520f3ed0ab73d5f5c 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -170,7 +170,8 @@ class StreamingContext private ( * should be same. */ def actorStream[T: ClassManifest]( - props: Props, name: String, + props: Props, + name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) @@ -179,19 +180,20 @@ class StreamingContext private ( /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param publisherUrl Url of remote zeromq publisher - * @param zeroMQ topic to subscribe to + * @param subscribe topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence * of byte thus it needs the converter(which might be deserializer of bytes) * to translate from sequence of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to memory-only. */ - def zeroMQStream[T: ClassManifest](publisherUrl:String, + def zeroMQStream[T: ClassManifest]( + publisherUrl:String, subscribe: Subscribe, bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { - + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } @@ -283,7 +285,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects in the received blocks */ - def rawNetworkStream[T: ClassManifest]( + def rawSocketStream[T: ClassManifest]( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 @@ -352,7 +354,7 @@ class StreamingContext private ( def twitterStream( username: String, password: String, - filters: Seq[String], + filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[Status] = { val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel) diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index 51efe6cae84942b0d4b246d16e68a488045eca87..4d93f0a5f729e48cbda0c18f6104449ccb48abc7 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -17,9 +17,7 @@ import spark.RDD * * This class contains the basic operations available on all DStreams, such as `map`, `filter` and * `window`. In addition, [[spark.streaming.api.java.JavaPairDStream]] contains operations available - * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations - * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through - * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. * * DStreams internally is characterized by a few basic properties: * - A list of other DStreams that the DStream depends on diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index d2a0ba725fdbfd8d0e485b2b9c49342c2a07a129..f3b40b5b88ef9dd80f2f717ccdd2bf151ddd6f00 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -1,16 +1,26 @@ package spark.streaming.api.java -import scala.collection.JavaConversions._ -import java.lang.{Long => JLong, Integer => JInt} - import spark.streaming._ -import dstream._ +import receivers.{ActorReceiver, ReceiverSupervisorStrategy} +import spark.streaming.dstream._ import spark.storage.StorageLevel + import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import spark.api.java.{JavaSparkContext, JavaRDD} + import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} + +import twitter4j.Status + +import akka.actor.Props +import akka.actor.SupervisorStrategy +import akka.zeromq.Subscribe + +import scala.collection.JavaConversions._ + +import java.lang.{Long => JLong, Integer => JInt} import java.io.InputStream import java.util.{Map => JMap} -import spark.api.java.{JavaSparkContext, JavaRDD} /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -128,7 +138,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ - def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel) + def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel) : JavaDStream[String] = { ssc.socketTextStream(hostname, port, storageLevel) } @@ -186,13 +196,13 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects in the received blocks */ - def rawNetworkStream[T]( + def rawSocketStream[T]( hostname: String, port: Int, storageLevel: StorageLevel): JavaDStream[T] = { implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel)) + JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel)) } /** @@ -204,10 +214,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param port Port to connect to for receiving data * @tparam T Type of the objects in the received blocks */ - def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = { + def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = { implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port)) + JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port)) } /** @@ -246,11 +256,178 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent */ - def flumeStream(hostname: String, port: Int): - JavaDStream[SparkFlumeEvent] = { + def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { ssc.flumeStream(hostname, port) } + /** + * Create a input stream that returns tweets received from Twitter. + * @param username Twitter username + * @param password Twitter password + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + username: String, + password: String, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(username, password, filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param username Twitter username + * @param password Twitter password + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + username: String, + password: String, + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(username, password, filters) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param username Twitter username + * @param password Twitter password + */ + def twitterStream( + username: String, + password: String + ): JavaDStream[Status] = { + ssc.twitterStream(username, password) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + props: Props, + name: String, + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.actorStream[T](props, name, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + props: Props, + name: String, + storageLevel: StorageLevel + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.actorStream[T](props, name, storageLevel) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + props: Props, + name: String + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.actorStream[T](props, name) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel Storage level to use for storing the received objects + */ + def zeroMQStream[T]( + publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def zeroMQStream[T]( + publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + */ + def zeroMQStream[T]( + publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + } + /** * Registers an output stream that will be computed every interval */ diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala index a4db44a608e08ff8c22c16b5c2c147bf4cc5085f..3c5d43a60955a38273aabd224073c8fabf2830d4 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala @@ -29,7 +29,7 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex false // Time not valid } else { // Time is valid, but check it it is more than lastValidTime - if (lastValidTime == null || lastValidTime <= time) { + if (lastValidTime != null && time < lastValidTime) { logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime) } lastValidTime = time diff --git a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala index 024bf3bea47832867e2ce419e41e56786fa2c73d..6b310bc0b611c7a6260042e3ddc555a59bed28a2 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/QueueInputDStream.scala @@ -7,6 +7,7 @@ import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer import spark.streaming.{Time, StreamingContext} +private[streaming] class QueueInputDStream[T: ClassManifest]( @transient ssc: StreamingContext, val queue: Queue[RDD[T]], diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 4530af5f6af5037b035b27bf3b305fdc7767b5d1..3bed500f73e84548a2f470a414ea68cfbb3df1ac 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -24,10 +24,16 @@ import spark.streaming.api.java.JavaStreamingContext; import spark.streaming.JavaTestUtils; import spark.streaming.JavaCheckpointTestUtils; import spark.streaming.dstream.KafkaPartitionKey; +import spark.streaming.InputStreamsSuite; import java.io.*; import java.util.*; +import akka.actor.Props; +import akka.zeromq.Subscribe; + + + // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. @@ -1205,12 +1211,12 @@ public class JavaAPISuite implements Serializable { } @Test - public void testNetworkTextStream() { + public void testSocketTextStream() { JavaDStream test = ssc.socketTextStream("localhost", 12345); } @Test - public void testNetworkString() { + public void testSocketString() { class Converter extends Function<InputStream, Iterable<String>> { public Iterable<String> call(InputStream in) { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); @@ -1239,13 +1245,13 @@ public class JavaAPISuite implements Serializable { } @Test - public void testRawNetworkStream() { - JavaDStream test = ssc.rawNetworkStream("localhost", 12345); + public void testRawSocketStream() { + JavaDStream test = ssc.rawSocketStream("localhost", 12345); } @Test public void testFlumeStream() { - JavaDStream test = ssc.flumeStream("localhost", 12345); + JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); } @Test @@ -1253,4 +1259,25 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); } + + @Test + public void testTwitterStream() { + String[] filters = new String[] { "good", "bad", "ugly" }; + JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testActorStream() { + JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testZeroMQStream() { + JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { + @Override + public Iterable<String> call(byte[][] b) throws Exception { + return null; + } + }); + } } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index c9f941c5b8b646ab474534ba060fd45a75a18678..1024d3ac9790e801a060d90a9faab1d64237515b 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -42,7 +42,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } - test("network input stream") { + test("socket input stream") { // Start the server val testServer = new TestServer(testPort) testServer.start()