Skip to content
Snippets Groups Projects
Commit 8ad561dc authored by Tathagata Das's avatar Tathagata Das
Browse files

Added checkpointing and fault-tolerance semantics to the programming guide....

Added checkpointing and fault-tolerance semantics to the programming guide. Fixed default checkpoint interval to being a multiple of slide duration. Fixed visibility of some classes and objects to clean up docs.
parent f98c7da2
No related branches found
No related tags found
No related merge requests found
......@@ -87,7 +87,7 @@ By default, the `pyspark` shell creates SparkContext that runs jobs locally.
To connect to a non-local cluster, set the `MASTER` environment variable.
For example, to use the `pyspark` shell with a [standalone Spark cluster](spark-standalone.html):
{% highlight shell %}
{% highlight bash %}
$ MASTER=spark://IP:PORT ./pyspark
{% endhighlight %}
......
This diff is collapsed.
......@@ -132,7 +132,7 @@ abstract class DStream[T: ClassManifest] (
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration.max(Seconds(10))
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
......
......@@ -16,7 +16,7 @@ case class Duration (private val millis: Long) {
def * (times: Int): Duration = new Duration(millis * times)
def / (that: Duration): Long = millis / that.millis
def / (that: Duration): Double = millis.toDouble / that.millis.toDouble
def isMultipleOf(that: Duration): Boolean =
(this.millis % that.millis == 0)
......
......@@ -30,6 +30,7 @@ class Interval(val beginTime: Time, val endTime: Time) {
override def toString = "[" + beginTime + ", " + endTime + "]"
}
private[streaming]
object Interval {
def currentInterval(duration: Duration): Interval = {
val time = new Time(System.currentTimeMillis)
......
......@@ -18,8 +18,8 @@ import org.apache.hadoop.conf.Configuration
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
extends Serializable {
def ssc = self.ssc
private[streaming] def ssc = self.ssc
private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
new HashPartitioner(numPartitions)
......@@ -242,7 +242,9 @@ extends Serializable {
* Return a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
*
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
*
* This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
......@@ -399,7 +401,7 @@ extends Serializable {
}
/**
* Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
* Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. Partitioner is used to partition each generated RDD.
*/
......
......@@ -13,6 +13,7 @@ import twitter4j.auth.BasicAuthorization
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
* such that this may return a sampled subset of all tweets during each interval.
*/
private[streaming]
class TwitterInputDStream(
@transient ssc_ : StreamingContext,
username: String,
......@@ -26,6 +27,7 @@ class TwitterInputDStream(
}
}
private[streaming]
class TwitterReceiver(
username: String,
password: String,
......
......@@ -50,7 +50,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2
val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
val secondNumBatches = firstNumBatches
// Setup the streams
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment