Skip to content
Snippets Groups Projects
Commit 76aa45d3 authored by Xin Wu's avatar Xin Wu Committed by Cheng Lian
Browse files

[SPARK-14959][SQL] handle partitioned table directories in distributed filesystem

## What changes were proposed in this pull request?
##### The root cause:
When `DataSource.resolveRelation` is trying to build `ListingFileCatalog` object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are retrieved from the provided path. These FileStatus objects include directories for the partitions (id=0 and id=2 in the jira). However, these directory `FileStatus` objects also try to invoke `getFileBlockLocations` where directory is not allowed for `DistributedFileSystem`, hence the exception happens.

This PR is to remove the block of code that invokes `getFileBlockLocations` for every FileStatus object of the provided path. Instead, we call `HadoopFsRelation.listLeafFiles` directly because this utility method filters out the directories before calling `getFileBlockLocations` for generating `LocatedFileStatus` objects.

## How was this patch tested?
Regtest is run. Manual test:
```
scala> spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show
+-----+---+
| text| id|
+-----+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-----+---+

       spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show
+-----+---+
| text| id|
+-----+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-----+---+
```
I also tried it with 2 level of partitioning.
I have not found a way to add test case in the unit test bucket that can test a real hdfs file location. Any suggestions will be appreciated.

Author: Xin Wu <xinwu@us.ibm.com>

Closes #13463 from xwu0226/SPARK-14959.
parent 6dde2740
No related branches found
No related tags found
No related merge requests found
...@@ -83,40 +83,10 @@ class ListingFileCatalog( ...@@ -83,40 +83,10 @@ class ListingFileCatalog(
val statuses: Seq[FileStatus] = paths.flatMap { path => val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf) val fs = path.getFileSystem(hadoopConf)
logInfo(s"Listing $path on driver") logInfo(s"Listing $path on driver")
Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)).
val statuses = { getOrElse(Array.empty)
val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
}
statuses.map {
case f: LocatedFileStatus => f
// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a
// a big deal since we always use to `listLeafFilesInParallel` when the number of paths
// exceeds threshold.
case f =>
HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}.filterNot { status =>
val name = status.getPath.getName
HadoopFsRelation.shouldFilterOut(name)
}
val (dirs, files) = statuses.partition(_.isDirectory)
// It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
if (dirs.isEmpty) {
mutable.LinkedHashSet(files: _*)
} else {
mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
} }
mutable.LinkedHashSet(statuses: _*)
} }
} }
......
...@@ -381,6 +381,16 @@ private[sql] object HadoopFsRelation extends Logging { ...@@ -381,6 +381,16 @@ private[sql] object HadoopFsRelation extends Logging {
} }
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus => f case f: LocatedFileStatus => f
// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
// paths exceeds threshold.
case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
} }
} }
......
...@@ -490,6 +490,7 @@ class LocalityTestFileSystem extends RawLocalFileSystem { ...@@ -490,6 +490,7 @@ class LocalityTestFileSystem extends RawLocalFileSystem {
override def getFileBlockLocations( override def getFileBlockLocations(
file: FileStatus, start: Long, len: Long): Array[BlockLocation] = { file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
require(!file.isDirectory, "The file path can not be a directory.")
val count = invocations.getAndAdd(1) val count = invocations.getAndAdd(1)
Array(new BlockLocation(Array(s"host$count:50010"), Array(s"host$count"), 0, len)) Array(new BlockLocation(Array(s"host$count:50010"), Array(s"host$count"), 0, len))
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment