From 271e3237f3e2efa62a94d6936fce551a40edd65f Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Fri, 27 Dec 2013 12:26:57 -0800
Subject: [PATCH] Minor changes in comments and strings to address comments in
 PR 289.

---
 .../spark/streaming/dstream/FileInputDStream.scala | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 95224282f6..fb9eda8996 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -99,9 +99,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   }
 
   /**
-   * Finds files which have modification timestamp <= current time. If some files are being
-   * deleted in the directory, then it can generate transient exceptions. Hence, multiple
-   * attempts are made to handle these transient exceptions. Returns 3-tuple
+   * Find files which have modification timestamp <= current time and return a 3-tuple of
    * (new files found, latest modification time among them, files with latest modification time)
    */
   private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
@@ -116,9 +114,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
     files.zip(fileRDDs).foreach { case (file, rdd) => {
       if (rdd.partitions.size == 0) {
-        logWarning("File " + file + " has no data in it. Are you sure you are following " +
-          "the move-based method of adding input files? Refer to the programming guide " +
-          "for more details.")
+        logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
+          "files that have been \"moved\" to the directory assigned to the file stream. " +
+          "Refer to the streaming programming guide for more details.")
       }
     }}
     new UnionRDD(context.sparkContext, fileRDDs)
@@ -181,8 +179,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   }
 
   /**
-   * Custom PathFilter class to find new files that have modification timestamps <= current time, but have not
-   * been seen before (i.e. the file should not be in lastModTimeFiles)
+   * Custom PathFilter class to find new files that have modification timestamps <= current time,
+   * but have not been seen before (i.e. the file should not be in lastModTimeFiles)
    */
   private[streaming]
   class CustomPathFilter(maxModTime: Long) extends PathFilter {
-- 
GitLab