Skip to content
Snippets Groups Projects
Commit c337844e authored by Cheng Lian's avatar Cheng Lian
Browse files

[SPARK-8604] [SQL] HadoopFsRelation subclasses should set their output format class

`HadoopFsRelation` subclasses, especially `ParquetRelation2` should set its own output format class, so that the default output committer can be setup correctly when doing appending (where we ignore user defined output committers).

Author: Cheng Lian <lian@databricks.com>

Closes #6998 from liancheng/spark-8604 and squashes the following commits:

9be51d1 [Cheng Lian] Adds more comments
6db1368 [Cheng Lian] HadoopFsRelation subclasses should set their output format class
parent 7bac2fe7
No related branches found
No related tags found
No related merge requests found
...@@ -194,6 +194,12 @@ private[sql] class ParquetRelation2( ...@@ -194,6 +194,12 @@ private[sql] class ParquetRelation2(
committerClass, committerClass,
classOf[ParquetOutputCommitter]) classOf[ParquetOutputCommitter])
// We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
// it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why
// we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
// TODO There's no need to use two kinds of WriteSupport // TODO There's no need to use two kinds of WriteSupport
// We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
// complex types. // complex types.
......
...@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer ...@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, RecordWriter, Reporter} import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
...@@ -194,6 +194,16 @@ private[sql] class OrcRelation( ...@@ -194,6 +194,16 @@ private[sql] class OrcRelation(
} }
override def prepareJobForWrite(job: Job): OutputWriterFactory = { override def prepareJobForWrite(job: Job): OutputWriterFactory = {
job.getConfiguration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
case conf =>
conf.setClass(
"mapred.output.format.class",
classOf[OrcOutputFormat],
classOf[MapRedOutputFormat[_, _]])
}
new OutputWriterFactory { new OutputWriterFactory {
override def newInstance( override def newInstance(
path: String, path: String,
......
...@@ -119,6 +119,8 @@ class SimpleTextRelation( ...@@ -119,6 +119,8 @@ class SimpleTextRelation(
} }
override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory { override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
job.setOutputFormatClass(classOf[TextOutputFormat[_, _]])
override def newInstance( override def newInstance(
path: String, path: String,
dataSchema: StructType, dataSchema: StructType,
......
...@@ -719,4 +719,25 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { ...@@ -719,4 +719,25 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
} }
} }
} }
test("SPARK-8604: Parquet data source should write summary file while doing appending") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sqlContext.range(0, 5)
df.write.mode(SaveMode.Overwrite).parquet(path)
val summaryPath = new Path(path, "_metadata")
val commonSummaryPath = new Path(path, "_common_metadata")
val fs = summaryPath.getFileSystem(configuration)
fs.delete(summaryPath, true)
fs.delete(commonSummaryPath, true)
df.write.mode(SaveMode.Append).parquet(path)
checkAnswer(sqlContext.read.parquet(path), df.unionAll(df))
assert(fs.exists(summaryPath))
assert(fs.exists(commonSummaryPath))
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment