diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index f47ed76cba76520b661bd2a2f6b5f0852d165f6b..057bde1a75f07ce8ea9f4da6e3844463dd2cffb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 
 /**
@@ -86,6 +86,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
+    verifySchema(dataSchema)
     val conf = job.getConfiguration
     val csvOptions = new CSVOptions(options)
     csvOptions.compressionCodec.foreach { codec =>
@@ -172,4 +173,15 @@ class DefaultSource extends FileFormat with DataSourceRegister {
         .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
     }
   }
+
+  private def verifySchema(schema: StructType): Unit = {
+    schema.foreach { field =>
+      field.dataType match {
+        case _: ArrayType | _: MapType | _: StructType =>
+          throw new UnsupportedOperationException(
+            s"CSV data source does not support ${field.dataType.simpleString} data type.")
+        case _ =>
+      }
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 27d6dc9197d27fcd5f9dd255b4b6278d2b4e0ed7..bae290776f6e1b6fe8db7edf70fcba43cca31cf0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.sql.types._
 
 class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
+  import testImplicits._
+
   private val carsFile = "cars.csv"
   private val carsMalformedFile = "cars-malformed.csv"
   private val carsFile8859 = "cars_iso-8859-1.csv"
@@ -582,4 +584,24 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
 
     assert(numbers.count() == 8)
   }
+
+  test("error handling for unsupported data types.") {
+    withTempDir { dir =>
+      val csvDir = new File(dir, "csv").getCanonicalPath
+      var msg = intercept[UnsupportedOperationException] {
+        Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir)
+      }.getMessage
+      assert(msg.contains("CSV data source does not support struct<a:int,b:string> data type"))
+
+      msg = intercept[UnsupportedOperationException] {
+        Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir)
+      }.getMessage
+      assert(msg.contains("CSV data source does not support map<string,int> data type"))
+
+      msg = intercept[UnsupportedOperationException] {
+        Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir)
+      }.getMessage
+      assert(msg.contains("CSV data source does not support array<string> data type"))
+    }
+  }
 }