Skip to content
Snippets Groups Projects
Commit acc01ab3 authored by CodingCat's avatar CodingCat Committed by Reynold Xin
Browse files

SPARK-2038: rename "conf" parameters in the saveAsHadoop functions with source-compatibility

https://issues.apache.org/jira/browse/SPARK-2038

to differentiate with SparkConf object and at the same time keep the source level compatibility

Author: CodingCat <zhunansjtu@gmail.com>

Closes #1137 from CodingCat/SPARK-2038 and squashes the following commits:

11abeba [CodingCat] revise the comments
7ee5712 [CodingCat] to keep the source-compatibility
763975f [CodingCat] style fix
d91288d [CodingCat] rename "conf" parameters in the saveAsHadoop functions
parent 22036aeb
No related branches found
No related tags found
No related merge requests found
...@@ -762,7 +762,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) ...@@ -762,7 +762,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
outputFormatClass: Class[_ <: NewOutputFormat[_, _]], outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = self.context.hadoopConfiguration) conf: Configuration = self.context.hadoopConfiguration)
{ {
val job = new NewAPIHadoopJob(conf) // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = new NewAPIHadoopJob(hadoopConf)
job.setOutputKeyClass(keyClass) job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass) job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass) job.setOutputFormatClass(outputFormatClass)
...@@ -795,22 +797,25 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) ...@@ -795,22 +797,25 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
outputFormatClass: Class[_ <: OutputFormat[_, _]], outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration), conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None) { codec: Option[Class[_ <: CompressionCodec]] = None) {
conf.setOutputKeyClass(keyClass) // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
conf.setOutputValueClass(valueClass) val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
// Doesn't work in Scala 2.9 due to what may be a generics bug // Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10? // TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass) // conf.setOutputFormat(outputFormatClass)
conf.set("mapred.output.format.class", outputFormatClass.getName) hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
for (c <- codec) { for (c <- codec) {
conf.setCompressMapOutput(true) hadoopConf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true") hadoopConf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c) hadoopConf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName) hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
} }
conf.setOutputCommitter(classOf[FileOutputCommitter]) hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf)) FileOutputFormat.setOutputPath(hadoopConf,
saveAsHadoopDataset(conf) SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
} }
/** /**
...@@ -820,7 +825,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) ...@@ -820,7 +825,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* configured for a Hadoop MapReduce job. * configured for a Hadoop MapReduce job.
*/ */
def saveAsNewAPIHadoopDataset(conf: Configuration) { def saveAsNewAPIHadoopDataset(conf: Configuration) {
val job = new NewAPIHadoopJob(conf) // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = new NewAPIHadoopJob(hadoopConf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm") val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date()) val jobtrackerID = formatter.format(new Date())
val stageId = self.id val stageId = self.id
...@@ -877,9 +884,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) ...@@ -877,9 +884,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* MapReduce job. * MapReduce job.
*/ */
def saveAsHadoopDataset(conf: JobConf) { def saveAsHadoopDataset(conf: JobConf) {
val outputFormatInstance = conf.getOutputFormat // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val keyClass = conf.getOutputKeyClass val hadoopConf = conf
val valueClass = conf.getOutputValueClass val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
if (outputFormatInstance == null) { if (outputFormatInstance == null) {
throw new SparkException("Output format class not set") throw new SparkException("Output format class not set")
} }
...@@ -889,18 +898,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) ...@@ -889,18 +898,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
if (valueClass == null) { if (valueClass == null) {
throw new SparkException("Output value class not set") throw new SparkException("Output value class not set")
} }
SparkHadoopUtil.get.addCredentials(conf) SparkHadoopUtil.get.addCredentials(hadoopConf)
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")") valueClass.getSimpleName + ")")
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter // FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(conf) val ignoredFs = FileSystem.get(hadoopConf)
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
} }
val writer = new SparkHadoopWriter(conf) val writer = new SparkHadoopWriter(hadoopConf)
writer.preSetup() writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) { def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment