From e4ce1bc4f3ca088365ff199e563f23a552dc88ef Mon Sep 17 00:00:00 2001 From: jerryshao <sshao@hortonworks.com> Date: Wed, 1 Jun 2016 08:28:19 -0500 Subject: [PATCH] [SPARK-15659][SQL] Ensure FileSystem is gotten from path ## What changes were proposed in this pull request? Currently `spark.sql.warehouse.dir` is pointed to local dir by default, which will throw exception when HADOOP_CONF_DIR is configured and default FS is hdfs. ``` java.lang.IllegalArgumentException: Wrong FS: file:/Users/sshao/projects/apache-spark/spark-warehouse, expected: hdfs://localhost:8020 ``` So we should always get the `FileSystem` from `Path` to avoid wrong FS problem. ## How was this patch tested? Local test. Author: jerryshao <sshao@hortonworks.com> Closes #13405 from jerryshao/SPARK-15659. --- .../sql/catalyst/catalog/InMemoryCatalog.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 489a1c8c3f..60525794ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -22,7 +22,7 @@ import java.io.IOException import scala.collection.mutable import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException @@ -105,8 +105,6 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E } } - private val fs = FileSystem.get(hadoopConfig) - // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -120,7 +118,9 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E } } else { try { - fs.mkdirs(new Path(dbDefinition.locationUri)) + val location = new Path(dbDefinition.locationUri) + val fs = location.getFileSystem(hadoopConfig) + fs.mkdirs(location) } catch { case e: IOException => throw new SparkException(s"Unable to create database ${dbDefinition.name} as failed " + @@ -147,7 +147,9 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E // Remove the database. val dbDefinition = catalog(db).db try { - fs.delete(new Path(dbDefinition.locationUri), true) + val location = new Path(dbDefinition.locationUri) + val fs = location.getFileSystem(hadoopConfig) + fs.delete(location, true) } catch { case e: IOException => throw new SparkException(s"Unable to drop database ${dbDefinition.name} as failed " + @@ -203,6 +205,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E if (tableDefinition.tableType == CatalogTableType.MANAGED) { val dir = new Path(catalog(db).db.locationUri, table) try { + val fs = dir.getFileSystem(hadoopConfig) fs.mkdirs(dir) } catch { case e: IOException => @@ -223,6 +226,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E if (getTable(db, table).tableType == CatalogTableType.MANAGED) { val dir = new Path(catalog(db).db.locationUri, table) try { + val fs = dir.getFileSystem(hadoopConfig) fs.delete(dir, true) } catch { case e: IOException => @@ -248,6 +252,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E val oldDir = new Path(catalog(db).db.locationUri, oldName) val newDir = new Path(catalog(db).db.locationUri, newName) try { + val fs = oldDir.getFileSystem(hadoopConfig) fs.rename(oldDir, newDir) } catch { case e: IOException => @@ -338,6 +343,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E p.spec.get(col).map(col + "=" + _) }.mkString("/") try { + val fs = tableDir.getFileSystem(hadoopConfig) fs.mkdirs(new Path(tableDir, partitionPath)) } catch { case e: IOException => @@ -373,6 +379,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E p.get(col).map(col + "=" + _) }.mkString("/") try { + val fs = tableDir.getFileSystem(hadoopConfig) fs.delete(new Path(tableDir, partitionPath), true) } catch { case e: IOException => @@ -409,6 +416,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E newSpec.get(col).map(col + "=" + _) }.mkString("/") try { + val fs = tableDir.getFileSystem(hadoopConfig) fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath)) } catch { case e: IOException => -- GitLab