Skip to content
Snippets Groups Projects
Commit 3618d70b authored by Tathagata Das's avatar Tathagata Das
Browse files

Added warning if filestream adds files with no data in them (file RDDs have 0 partitions).

parent be647191
No related branches found
No related tags found
No related merge requests found
......@@ -114,6 +114,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
files.zip(fileRDDs).foreach { case (file, rdd) => {
if (rdd.partitions.size == 0) {
logWarning("File " + file + " has no data in it. Are you sure you are following " +
"the move-based method of adding input files? Refer to the programming guide " +
"for more details.")
}
}}
new UnionRDD(context.sparkContext, fileRDDs)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment