Skip to content
Snippets Groups Projects
Commit 6844d36a authored by hyukjinkwon's avatar hyukjinkwon Committed by Reynold Xin
Browse files

[SPARK-12871][SQL] Support to specify the option for compression codec.

https://issues.apache.org/jira/browse/SPARK-12871
This PR added an option to support to specify compression codec.
This adds the option `codec` as an alias `compression` as filed in [SPARK-12668 ](https://issues.apache.org/jira/browse/SPARK-12668).

Note that I did not add configurations for Hadoop 1.x as this `CsvRelation` is using Hadoop 2.x API and I guess it is going to drop Hadoop 1.x support.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #10805 from HyukjinKwon/SPARK-12420.
parent 488bbb21
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,10 @@ package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.Charset
import org.apache.hadoop.io.compress._
import org.apache.spark.Logging
import org.apache.spark.util.Utils
private[sql] case class CSVParameters(@transient parameters: Map[String, String]) extends Logging {
......@@ -35,7 +38,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String]
private def getBool(paramName: String, default: Boolean = false): Boolean = {
val param = parameters.getOrElse(paramName, default.toString)
if (param.toLowerCase() == "true") {
if (param.toLowerCase == "true") {
true
} else if (param.toLowerCase == "false") {
false
......@@ -73,6 +76,11 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String]
val nullValue = parameters.getOrElse("nullValue", "")
val compressionCodec: Option[String] = {
val name = parameters.get("compression").orElse(parameters.get("codec"))
name.map(CSVCompressionCodecs.getCodecClassName)
}
val maxColumns = 20480
val maxCharsPerColumn = 100000
......@@ -85,7 +93,6 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String]
}
private[csv] object ParseModes {
val PERMISSIVE_MODE = "PERMISSIVE"
val DROP_MALFORMED_MODE = "DROPMALFORMED"
val FAIL_FAST_MODE = "FAILFAST"
......@@ -107,3 +114,28 @@ private[csv] object ParseModes {
true // We default to permissive is the mode string is not valid
}
}
private[csv] object CSVCompressionCodecs {
private val shortCompressionCodecNames = Map(
"bzip2" -> classOf[BZip2Codec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)
/**
* Return the full version of the given codec class.
* If it is already a class name, just return it.
*/
def getCodecClassName(name: String): String = {
val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
try {
// Validate the codec name
Utils.classForName(codecName)
codecName
} catch {
case e: ClassNotFoundException =>
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
}
}
}
......@@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.hadoop.mapreduce.RecordWriter
......@@ -99,6 +100,15 @@ private[csv] class CSVRelation(
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = job.getConfiguration
params.compressionCodec.foreach { codec =>
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
}
new CSVOutputWriterFactory(params)
}
......
......@@ -349,4 +349,30 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(results(0).toSeq === Array(2012, "Tesla", "S", "null", "null"))
assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
}
test("save csv with compression codec option") {
withTempDir { dir =>
val csvDir = new File(dir, "csv").getCanonicalPath
val cars = sqlContext.read
.format("csv")
.option("header", "true")
.load(testFile(carsFile))
cars.coalesce(1).write
.format("csv")
.option("header", "true")
.option("compression", "gZiP")
.save(csvDir)
val compressedFiles = new File(csvDir).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(".gz")))
val carsCopy = sqlContext.read
.format("csv")
.option("header", "true")
.load(csvDir)
verifyCars(carsCopy, withHeader = true)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment