diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 644e5d65d612c3e6579fb49424edbea182b3ac3e..dd3c96a792357a4200a6de3532b0737f027f5d83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -83,40 +83,10 @@ class ListingFileCatalog(
       val statuses: Seq[FileStatus] = paths.flatMap { path =>
         val fs = path.getFileSystem(hadoopConf)
         logInfo(s"Listing $path on driver")
-
-        val statuses = {
-          val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-          if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
-        }
-
-        statuses.map {
-          case f: LocatedFileStatus => f
-
-          // NOTE:
-          //
-          // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
-          //   operations, calling `getFileBlockLocations` does no harm here since these file system
-          //   implementations don't actually issue RPC for this method.
-          //
-          // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a
-          //   a big deal since we always use to `listLeafFilesInParallel` when the number of paths
-          //   exceeds threshold.
-          case f =>
-            HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
-        }
-      }.filterNot { status =>
-        val name = status.getPath.getName
-        HadoopFsRelation.shouldFilterOut(name)
-      }
-
-      val (dirs, files) = statuses.partition(_.isDirectory)
-
-      // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
-      if (dirs.isEmpty) {
-        mutable.LinkedHashSet(files: _*)
-      } else {
-        mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
+        Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)).
+          getOrElse(Array.empty)
       }
+      mutable.LinkedHashSet(statuses: _*)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index e0e569bca48b4c9714c0a5fcad4a2bde9523bea0..7f3eed3fb1f80ca7b1ebf3b759c207f7db2321e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -381,6 +381,16 @@ private[sql] object HadoopFsRelation extends Logging {
       }
       statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
         case f: LocatedFileStatus => f
+
+        // NOTE:
+        //
+        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+        //   operations, calling `getFileBlockLocations` does no harm here since these file system
+        //   implementations don't actually issue RPC for this method.
+        //
+        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
+        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
+        //   paths exceeds threshold.
         case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 52dda8c6ace842f69f02817a18853e557c4fd09a..25f1443e7080a0f0fe0982e3569049724c8103a2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -490,6 +490,7 @@ class LocalityTestFileSystem extends RawLocalFileSystem {
 
   override def getFileBlockLocations(
       file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
+    require(!file.isDirectory, "The file path can not be a directory.")
     val count = invocations.getAndAdd(1)
     Array(new BlockLocation(Array(s"host$count:50010"), Array(s"host$count"), 0, len))
   }