Skip to content
Snippets Groups Projects
Commit a06a9d5c authored by Jey Kottalam's avatar Jey Kottalam
Browse files

Rename HadoopWriter to SparkHadoopWriter since it's outside of our package

parent 8f979ede
No related branches found
No related tags found
No related merge requests found
......@@ -32,7 +32,7 @@ import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.hadoop.mapred.FileOutputFormat
import org.apache.hadoop.mapred.HadoopWriter
import org.apache.hadoop.mapred.SparkHadoopWriter
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
......@@ -653,7 +653,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
conf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
saveAsHadoopDataset(conf)
}
......@@ -679,7 +679,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
val writer = new HadoopWriter(conf)
val writer = new SparkHadoopWriter(conf)
writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) {
......
......@@ -36,7 +36,7 @@ import spark.SerializableWritable
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
class HadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable {
class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable {
private val now = new Date()
private val conf = new SerializableWritable(jobConf)
......@@ -165,7 +165,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoop
splitID = splitid
attemptID = attemptid
jID = new SerializableWritable[JobID](HadoopWriter.createJobID(now, jobid))
jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
}
......@@ -179,7 +179,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoop
}
}
object HadoopWriter {
object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment