Skip to content
Snippets Groups Projects
Commit 03c7b7c4 authored by sureshthalamati's avatar sureshthalamati Committed by Wenchen Fan
Browse files

[SPARK-15315][SQL] Adding error check to the CSV datasource writer for...

[SPARK-15315][SQL] Adding error check to  the CSV datasource writer for unsupported complex data types.

## What changes were proposed in this pull request?

Adds error handling to the CSV writer  for unsupported complex data types.  Currently garbage gets written to the output csv files if the data frame schema has complex data types.

## How was this patch tested?

Added new unit test case.

Author: sureshthalamati <suresh.thalamati@gmail.com>

Closes #13105 from sureshthalamati/csv_complex_types_SPARK-15315.
parent 37c617e4
No related branches found
No related tags found
No related merge requests found
...@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow ...@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.SerializableConfiguration
/** /**
...@@ -86,6 +86,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { ...@@ -86,6 +86,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
job: Job, job: Job,
options: Map[String, String], options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = { dataSchema: StructType): OutputWriterFactory = {
verifySchema(dataSchema)
val conf = job.getConfiguration val conf = job.getConfiguration
val csvOptions = new CSVOptions(options) val csvOptions = new CSVOptions(options)
csvOptions.compressionCodec.foreach { codec => csvOptions.compressionCodec.foreach { codec =>
...@@ -172,4 +173,15 @@ class DefaultSource extends FileFormat with DataSourceRegister { ...@@ -172,4 +173,15 @@ class DefaultSource extends FileFormat with DataSourceRegister {
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
} }
} }
private def verifySchema(schema: StructType): Unit = {
schema.foreach { field =>
field.dataType match {
case _: ArrayType | _: MapType | _: StructType =>
throw new UnsupportedOperationException(
s"CSV data source does not support ${field.dataType.simpleString} data type.")
case _ =>
}
}
}
} }
...@@ -31,6 +31,8 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} ...@@ -31,6 +31,8 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
import testImplicits._
private val carsFile = "cars.csv" private val carsFile = "cars.csv"
private val carsMalformedFile = "cars-malformed.csv" private val carsMalformedFile = "cars-malformed.csv"
private val carsFile8859 = "cars_iso-8859-1.csv" private val carsFile8859 = "cars_iso-8859-1.csv"
...@@ -582,4 +584,24 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { ...@@ -582,4 +584,24 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(numbers.count() == 8) assert(numbers.count() == 8)
} }
test("error handling for unsupported data types.") {
withTempDir { dir =>
val csvDir = new File(dir, "csv").getCanonicalPath
var msg = intercept[UnsupportedOperationException] {
Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir)
}.getMessage
assert(msg.contains("CSV data source does not support struct<a:int,b:string> data type"))
msg = intercept[UnsupportedOperationException] {
Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir)
}.getMessage
assert(msg.contains("CSV data source does not support map<string,int> data type"))
msg = intercept[UnsupportedOperationException] {
Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir)
}.getMessage
assert(msg.contains("CSV data source does not support array<string> data type"))
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment