diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 4ed6728994193c61fa3d079c623f901506f75176..bd144c9575c722ec6122f9eda1f12766b1a4e9c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -278,7 +278,7 @@ class JacksonParser( // We cannot parse this token based on the given data type. So, we throw a // RuntimeException and this exception will be caught by `parse` method. throw new RuntimeException( - s"Failed to parse a value for data type $dataType (current token: $token).") + s"Failed to parse a value for data type ${dataType.catalogString} (current token: $token).") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index 159aef220be1568e38992fbfc4a9872c08efb178..43591a9ff524a0f82b266f71c3aa470eaaf1593b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util._ @@ -65,7 +66,8 @@ class FailureSafeParser[IN]( case DropMalformedMode => Iterator.empty case FailFastMode => - throw e.cause + throw new SparkException("Malformed records are detected in record parsing. " + + s"Parse Mode: ${FailFastMode.name}.", e.cause) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala index fb632cf2bb70ede3fda1de55100bc0b931d94137..a270a6451d5dd32633d672730abf94069bd696c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala @@ -21,6 +21,7 @@ import java.util.Comparator import com.fasterxml.jackson.core._ +import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil @@ -61,7 +62,8 @@ private[sql] object JsonInferSchema { case DropMalformedMode => None case FailFastMode => - throw e + throw new SparkException("Malformed records are detected in schema inference. " + + s"Parse Mode: ${FailFastMode.name}.", e) } } } @@ -231,8 +233,9 @@ private[sql] object JsonInferSchema { case FailFastMode => // If `other` is not struct type, consider it as malformed one and throws an exception. - throw new RuntimeException("Failed to infer a common schema. Struct types are expected" + - s" but ${other.catalogString} was found.") + throw new SparkException("Malformed records are detected in schema inference. " + + s"Parse Mode: ${FailFastMode.name}. Reasons: Failed to infer a common schema. " + + s"Struct types are expected, but `${other.catalogString}` was found.") } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index e66a60d7503f33a08cd9f7269b282b8ab15d2904..65472cda9c1c02521746ccf964bd3d6160d61080 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1036,24 +1036,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Corrupt records: FAILFAST mode") { - val schema = StructType( - StructField("a", StringType, true) :: Nil) // `FAILFAST` mode should throw an exception for corrupt records. val exceptionOne = intercept[SparkException] { spark.read .option("mode", "FAILFAST") .json(corruptRecords) - } - assert(exceptionOne.getMessage.contains("JsonParseException")) + }.getMessage + assert(exceptionOne.contains( + "Malformed records are detected in schema inference. Parse Mode: FAILFAST.")) val exceptionTwo = intercept[SparkException] { spark.read .option("mode", "FAILFAST") - .schema(schema) + .schema("a string") .json(corruptRecords) .collect() - } - assert(exceptionTwo.getMessage.contains("JsonParseException")) + }.getMessage + assert(exceptionTwo.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) } test("Corrupt records: DROPMALFORMED mode") { @@ -1944,7 +1944,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("mode", "FAILFAST") .json(path) } - assert(exceptionOne.getMessage.contains("Failed to infer a common schema")) + assert(exceptionOne.getMessage.contains("Malformed records are detected in schema " + + "inference. Parse Mode: FAILFAST.")) val exceptionTwo = intercept[SparkException] { spark.read @@ -1954,7 +1955,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .json(path) .collect() } - assert(exceptionTwo.getMessage.contains("Failed to parse a value")) + assert(exceptionTwo.getMessage.contains("Malformed records are detected in record " + + "parsing. Parse Mode: FAILFAST.")) } }