Skip to content
Snippets Groups Projects
Commit e4ce1bc4 authored by jerryshao's avatar jerryshao Committed by Sean Owen
Browse files

[SPARK-15659][SQL] Ensure FileSystem is gotten from path

## What changes were proposed in this pull request?

Currently `spark.sql.warehouse.dir` is pointed to local dir by default, which will throw exception when HADOOP_CONF_DIR is configured and default FS is hdfs.

```
java.lang.IllegalArgumentException: Wrong FS: file:/Users/sshao/projects/apache-spark/spark-warehouse, expected: hdfs://localhost:8020
```

So we should always get the `FileSystem` from `Path` to avoid wrong FS problem.

## How was this patch tested?

Local test.

Author: jerryshao <sshao@hortonworks.com>

Closes #13405 from jerryshao/SPARK-15659.
parent 1dd92564
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,7 @@ import java.io.IOException
import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
......@@ -105,8 +105,6 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
}
}
private val fs = FileSystem.get(hadoopConfig)
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
......@@ -120,7 +118,9 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
}
} else {
try {
fs.mkdirs(new Path(dbDefinition.locationUri))
val location = new Path(dbDefinition.locationUri)
val fs = location.getFileSystem(hadoopConfig)
fs.mkdirs(location)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to create database ${dbDefinition.name} as failed " +
......@@ -147,7 +147,9 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
// Remove the database.
val dbDefinition = catalog(db).db
try {
fs.delete(new Path(dbDefinition.locationUri), true)
val location = new Path(dbDefinition.locationUri)
val fs = location.getFileSystem(hadoopConfig)
fs.delete(location, true)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to drop database ${dbDefinition.name} as failed " +
......@@ -203,6 +205,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
if (tableDefinition.tableType == CatalogTableType.MANAGED) {
val dir = new Path(catalog(db).db.locationUri, table)
try {
val fs = dir.getFileSystem(hadoopConfig)
fs.mkdirs(dir)
} catch {
case e: IOException =>
......@@ -223,6 +226,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
if (getTable(db, table).tableType == CatalogTableType.MANAGED) {
val dir = new Path(catalog(db).db.locationUri, table)
try {
val fs = dir.getFileSystem(hadoopConfig)
fs.delete(dir, true)
} catch {
case e: IOException =>
......@@ -248,6 +252,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
val oldDir = new Path(catalog(db).db.locationUri, oldName)
val newDir = new Path(catalog(db).db.locationUri, newName)
try {
val fs = oldDir.getFileSystem(hadoopConfig)
fs.rename(oldDir, newDir)
} catch {
case e: IOException =>
......@@ -338,6 +343,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
p.spec.get(col).map(col + "=" + _)
}.mkString("/")
try {
val fs = tableDir.getFileSystem(hadoopConfig)
fs.mkdirs(new Path(tableDir, partitionPath))
} catch {
case e: IOException =>
......@@ -373,6 +379,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
p.get(col).map(col + "=" + _)
}.mkString("/")
try {
val fs = tableDir.getFileSystem(hadoopConfig)
fs.delete(new Path(tableDir, partitionPath), true)
} catch {
case e: IOException =>
......@@ -409,6 +416,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
newSpec.get(col).map(col + "=" + _)
}.mkString("/")
try {
val fs = tableDir.getFileSystem(hadoopConfig)
fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath))
} catch {
case e: IOException =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment