Skip to content
Snippets Groups Projects
Commit fcd17a1e authored by Tathagata Das's avatar Tathagata Das
Browse files

Fixed comments and long lines based on comments on PR 289.

parent 271e3237
No related branches found
No related tags found
No related merge requests found
......@@ -91,7 +91,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
}
// Save to file, and reload it as an RDD
val broadcastedConf = rdd.context.broadcast(new SerializableWritable(rdd.context.hadoopConfiguration))
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
......
......@@ -81,8 +81,8 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends
attempts += 1
try {
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
// This is inherently thread unsafe .. so alleviating it by writing to '.new' and
// then doing moves : which should be pretty fast.
// This is inherently thread unsafe, so alleviating it by writing to '.new' and
// then moving it to the final file
val fos = fs.create(writeFile)
fos.write(bytes)
fos.close()
......
......@@ -371,7 +371,8 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
......@@ -390,6 +391,8 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
......@@ -410,7 +413,9 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored.
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = {
......
......@@ -256,9 +256,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored.
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): JavaDStream[String] = {
......@@ -300,9 +302,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
......@@ -331,7 +334,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Creates a input stream from a Flume source.
* Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment