Skip to content
Snippets Groups Projects
Commit 271e3237 authored by Tathagata Das's avatar Tathagata Das
Browse files

Minor changes in comments and strings to address comments in PR 289.

parent 3618d70b
No related branches found
No related tags found
No related merge requests found
...@@ -99,9 +99,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -99,9 +99,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
} }
/** /**
* Finds files which have modification timestamp <= current time. If some files are being * Find files which have modification timestamp <= current time and return a 3-tuple of
* deleted in the directory, then it can generate transient exceptions. Hence, multiple
* attempts are made to handle these transient exceptions. Returns 3-tuple
* (new files found, latest modification time among them, files with latest modification time) * (new files found, latest modification time among them, files with latest modification time)
*/ */
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
...@@ -116,9 +114,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -116,9 +114,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
files.zip(fileRDDs).foreach { case (file, rdd) => { files.zip(fileRDDs).foreach { case (file, rdd) => {
if (rdd.partitions.size == 0) { if (rdd.partitions.size == 0) {
logWarning("File " + file + " has no data in it. Are you sure you are following " + logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"the move-based method of adding input files? Refer to the programming guide " + "files that have been \"moved\" to the directory assigned to the file stream. " +
"for more details.") "Refer to the streaming programming guide for more details.")
} }
}} }}
new UnionRDD(context.sparkContext, fileRDDs) new UnionRDD(context.sparkContext, fileRDDs)
...@@ -181,8 +179,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas ...@@ -181,8 +179,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
} }
/** /**
* Custom PathFilter class to find new files that have modification timestamps <= current time, but have not * Custom PathFilter class to find new files that have modification timestamps <= current time,
* been seen before (i.e. the file should not be in lastModTimeFiles) * but have not been seen before (i.e. the file should not be in lastModTimeFiles)
*/ */
private[streaming] private[streaming]
class CustomPathFilter(maxModTime: Long) extends PathFilter { class CustomPathFilter(maxModTime: Long) extends PathFilter {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment