Skip to content
Snippets Groups Projects
Commit 60e9b2bd authored by shaofei007's avatar shaofei007 Committed by Sean Owen
Browse files

[SPARK-21357][DSTREAMS] FileInputDStream not remove out of date RDD

## What changes were proposed in this pull request?

```DStreams
         class FileInputDStream

 [line 162]   protected[streaming] override def clearMetadata(time: Time) {
    batchTimeToSelectedFiles.synchronized {
      val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
      batchTimeToSelectedFiles --= oldFiles.keys

```
The above code does not remove the old generatedRDDs. "super.clearMetadata(time)" was added to the beginning of clearMetadata to remove the old generatedRDDs.

## How was this patch tested?

At the end of clearMetadata, the testing code (print the number of generatedRDDs) was added to check the old RDDS were removed manually.

Author: shaofei007 <1427357147@qq.com>
Author: Fei Shao <1427357147@qq.com>

Closes #18718 from shaofei007/master.
parent c1438203
No related branches found
No related tags found
No related merge requests found
......@@ -164,6 +164,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
batchTimeToSelectedFiles.synchronized {
val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
batchTimeToSelectedFiles --= oldFiles.keys
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment