diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
new file mode 100644
index 0000000000000000000000000000000000000000..985f0dc1cd60ecd7e522058c8198be5ac77bd6f0
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Exception thrown when the underlying parser meet a bad record and can't parse it.
+ * @param record a function to return the record that cause the parser to fail
+ * @param partialResult a function that returns an optional row, which is the partial result of
+ *                      parsing this bad record.
+ * @param cause the actual exception about why the record is bad and can't be parsed.
+ */
+case class BadRecordException(
+    record: () => UTF8String,
+    partialResult: () => Option[InternalRow],
+    cause: Throwable) extends Exception(cause)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e6d2b1bc28d95b9649c797a57eb23a4a9972ae4b..6c238618f2af7881e192285cddd984dbaf1a9f76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -27,11 +27,10 @@ import org.apache.spark.Partition
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
-import org.apache.spark.sql.catalyst.util.FailureSafeParser
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser}
 import org.apache.spark.sql.execution.datasources.csv._
-import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.types.{StringType, StructType}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
similarity index 82%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
index 725e3015b3416892b49f896d630c20f0ec91bdd4..159aef220be1568e38992fbfc4a9872c08efb178 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.catalyst.util
+package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -69,15 +70,3 @@ class FailureSafeParser[IN](
     }
   }
 }
-
-/**
- * Exception thrown when the underlying parser meet a bad record and can't parse it.
- * @param record a function to return the record that cause the parser to fail
- * @param partialResult a function that returns an optional row, which is the partial result of
- *                      parsing this bad record.
- * @param cause the actual exception about why the record is bad and can't be parsed.
- */
-case class BadRecordException(
-    record: () => UTF8String,
-    partialResult: () => Option[InternalRow],
-    cause: Throwable) extends Exception(cause)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 263f77e11c4da9f9022e5719e5690069c492ced7..c3657acb7d86747747543ebd97c96df068460d60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -30,7 +30,8 @@ import com.univocity.parsers.csv.CsvParser
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser}
+import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.FailureSafeParser
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 51e952c12202e1e50bffe920e187eac2f3d14a4d..4f2963da9ace929779a06ae6c2d14b016c307c3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -33,8 +33,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD}
 import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
-import org.apache.spark.sql.catalyst.util.FailureSafeParser
-import org.apache.spark.sql.execution.datasources.{CodecStreams, DataSource, HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String