Skip to content
Snippets Groups Projects
Commit 577c8cc8 authored by Tathagata Das's avatar Tathagata Das
Browse files

Removed unncessary options from WindowedDStream.

parent 3579647c
No related branches found
No related tags found
No related merge requests found
......@@ -39,8 +39,6 @@ class WindowedDStream[T: ClassTag](
throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "true").toBoolean
parent.persist(StorageLevel.MEMORY_ONLY_SER)
def windowDuration: Duration = _windowDuration
......@@ -54,11 +52,11 @@ class WindowedDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
val rddsInWindow = parent.slice(currentWindow)
val windowRDD = if (useNewUnion && rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
logInfo("Using partition aware union")
val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
logDebug("Using partition aware union for windowing at " + validTime)
new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
} else {
logInfo("Using normal union")
logDebug("Using normal union for windowing at " + validTime)
new UnionRDD(ssc.sc,rddsInWindow)
}
Some(windowRDD)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment