diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index a69a2b5645ddb46699c982e8c3b9965778f1460c..78aed4fb584c44ec90fa57f52fb303ae5ed57f11 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -203,7 +203,8 @@ package object config {
 
   private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles")
     .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
-      "encountering corrupt files and contents that have been read will still be returned.")
+      "encountering corrupted or non-existing files and contents that have been read will still " +
+      "be returned.")
     .booleanConf
     .createWithDefault(false)
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 6e87233cd98707ab53ac96d0fca3fe765c229718..a83e139c13eef273d78846e0118089581433340c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -248,12 +248,20 @@ class HadoopRDD[K, V](
       HadoopRDD.addLocalConfiguration(
         new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
         context.stageId, theSplit.index, context.attemptNumber, jobConf)
-      reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
 
+      reader =
+        try {
+          inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
+        } catch {
+          case e: IOException if ignoreCorruptFiles =>
+            logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
+            finished = true
+            null
+        }
       // Register an on-task-completion callback to close the input stream.
       context.addTaskCompletionListener{ context => closeIfNeeded() }
-      private val key: K = reader.createKey()
-      private val value: V = reader.createValue()
+      private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
+      private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
 
       override def getNext(): (K, V) = {
         try {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index e805192bb6a670206e9efca7d482ed8ace1c25cf..733e85f305f058b1479b47ab6223e8869e86df1c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -174,14 +174,25 @@ class NewHadoopRDD[K, V](
       }
       private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
       private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-      private var reader = format.createRecordReader(
-        split.serializableHadoopSplit.value, hadoopAttemptContext)
-      reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+      private var finished = false
+      private var reader =
+        try {
+          val _reader = format.createRecordReader(
+            split.serializableHadoopSplit.value, hadoopAttemptContext)
+          _reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+          _reader
+        } catch {
+          case e: IOException if ignoreCorruptFiles =>
+            logWarning(
+              s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
+              e)
+            finished = true
+            null
+        }
 
       // Register an on-task-completion callback to close the input stream.
       context.addTaskCompletionListener(context => close())
       private var havePair = false
-      private var finished = false
       private var recordsSinceMetricsUpdate = 0
 
       override def hasNext: Boolean = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 6d8cd81431515c45e42dff5df62542acc5d1af04..e753cd962a1a12c86a9766793a06d579117c22d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -151,6 +151,9 @@ class FileScanRDD(
               currentIterator = readFunction(currentFile)
             }
           } catch {
+            case e: IOException if ignoreCorruptFiles =>
+              logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
+              currentIterator = Iterator.empty
             case e: java.io.FileNotFoundException =>
               throw new java.io.FileNotFoundException(
                 e.getMessage + "\n" +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 91f3fe0fe9549c1c65fe9b52bea50e723a5b9547..c03e88b60e5ce8600f780ee42f755947dc8a8efc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -632,7 +632,8 @@ object SQLConf {
 
   val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
     .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
-      "encountering corrupt files and contents that have been read will still be returned.")
+      "encountering corrupted or non-existing and contents that have been read will still be " +
+      "returned.")
     .booleanConf
     .createWithDefault(false)