diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 93f1ad01bf9aa35ff007fb03b996cba7ffce4c98..5f17fdf9467db42980336a289f91f690f8964548 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -132,28 +132,20 @@ case class DataSource( // Found the data source using fully qualified path dataSource case Failure(error) => - if (error.isInstanceOf[ClassNotFoundException]) { - val className = error.getMessage - if (spark2RemovedClasses.contains(className)) { - throw new ClassNotFoundException(s"$className is removed in Spark 2.0. " + - "Please check if your library is compatible with Spark 2.0") - } - } - if (provider.startsWith("org.apache.spark.sql.hive.orc")) { - throw new ClassNotFoundException( - "The ORC data source must be used with Hive support enabled.", error) + if (provider.toLowerCase == "orc" || + provider.startsWith("org.apache.spark.sql.hive.orc")) { + throw new AnalysisException( + "The ORC data source must be used with Hive support enabled") + } else if (provider.toLowerCase == "avro" || + provider == "com.databricks.spark.avro") { + throw new AnalysisException( + s"Failed to find data source: ${provider.toLowerCase}. Please use Spark " + + "package http://spark-packages.org/package/databricks/spark-avro") } else { - if (provider == "avro" || provider == "com.databricks.spark.avro") { - throw new ClassNotFoundException( - s"Failed to find data source: $provider. Please use Spark package " + - "http://spark-packages.org/package/databricks/spark-avro", - error) - } else { - throw new ClassNotFoundException( - s"Failed to find data source: $provider. Please find packages at " + - "http://spark-packages.org", - error) - } + throw new ClassNotFoundException( + s"Failed to find data source: $provider. Please find packages at " + + "http://spark-packages.org", + error) } } } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index b622f859413a6d62a003b5901fc233ec7552c8a1..9afd715016d88242841554169205851532139394 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import scala.util.control.NonFatal + import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.SessionCatalog @@ -28,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} /** - * Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]]. + * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]]. */ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { @@ -38,6 +40,16 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo sparkSession, paths = u.tableIdentifier.table :: Nil, className = u.tableIdentifier.database.get) + + val notSupportDirectQuery = try { + !classOf[FileFormat].isAssignableFrom(dataSource.providingClass) + } catch { + case NonFatal(e) => false + } + if (notSupportDirectQuery) { + throw new AnalysisException("Unsupported data source type for direct query on files: " + + s"${u.tableIdentifier.database.get}") + } val plan = LogicalRelation(dataSource.resolveRelation()) u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan) } catch { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1a7f6ebbb29564b9edfcd74453a94d9a885b2ac9..4fcd6bc0d9ec95b773f335609b4d15d3f0e6f4c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1838,20 +1838,61 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { df) }) - val e1 = intercept[AnalysisException] { + var e = intercept[AnalysisException] { sql("select * from in_valid_table") } - assert(e1.message.contains("Table or view not found")) + assert(e.message.contains("Table or view not found")) - val e2 = intercept[AnalysisException] { + e = intercept[AnalysisException] { sql("select * from no_db.no_table").show() } - assert(e2.message.contains("Table or view not found")) + assert(e.message.contains("Table or view not found")) - val e3 = intercept[AnalysisException] { + e = intercept[AnalysisException] { sql("select * from json.invalid_file") } - assert(e3.message.contains("Path does not exist")) + assert(e.message.contains("Path does not exist")) + + e = intercept[AnalysisException] { + sql(s"select id from `org.apache.spark.sql.hive.orc`.`file_path`") + } + assert(e.message.contains("The ORC data source must be used with Hive support enabled")) + + e = intercept[AnalysisException] { + sql(s"select id from `com.databricks.spark.avro`.`file_path`") + } + assert(e.message.contains("Failed to find data source: com.databricks.spark.avro. " + + "Please use Spark package http://spark-packages.org/package/databricks/spark-avro")) + + // data source type is case insensitive + e = intercept[AnalysisException] { + sql(s"select id from Avro.`file_path`") + } + assert(e.message.contains("Failed to find data source: avro. Please use Spark package " + + "http://spark-packages.org/package/databricks/spark-avro")) + + e = intercept[AnalysisException] { + sql(s"select id from avro.`file_path`") + } + assert(e.message.contains("Failed to find data source: avro. Please use Spark package " + + "http://spark-packages.org/package/databricks/spark-avro")) + + e = intercept[AnalysisException] { + sql(s"select id from `org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`") + } + assert(e.message.contains("Table or view not found: " + + "`org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`")) + + e = intercept[AnalysisException] { + sql(s"select id from `Jdbc`.`file_path`") + } + assert(e.message.contains("Unsupported data source type for direct query on files: Jdbc")) + + e = intercept[AnalysisException] { + sql(s"select id from `org.apache.spark.sql.execution.datasources.jdbc`.`file_path`") + } + assert(e.message.contains("Unsupported data source type for direct query on files: " + + "org.apache.spark.sql.execution.datasources.jdbc")) } test("SortMergeJoin returns wrong results when using UnsafeRows") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala index f07c33042a72e54b570af8d443209b4c8d848963..85ba33e58a7872f1eb4b1d544b42e07a1934d821 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.sources -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{AnalysisException, SQLContext} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -42,9 +42,10 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext { } test("should fail to load ORC without Hive Support") { - intercept[ClassNotFoundException] { + val e = intercept[AnalysisException] { spark.read.format("orc").load() } + assert(e.message.contains("The ORC data source must be used with Hive support enabled")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 320aaea1e4eeb8b41607958e139445d928b8dda3..5ea1f32433699962eb1cc241d67caa89ec2f54cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.datasources.DataSource class ResolvedDataSourceSuite extends SparkFunSuite { @@ -60,13 +61,22 @@ class ResolvedDataSourceSuite extends SparkFunSuite { classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat]) } + test("csv") { + assert( + getProvidingClass("csv") === + classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat]) + assert( + getProvidingClass("com.databricks.spark.csv") === + classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat]) + } + test("error message for unknown data sources") { - val error1 = intercept[ClassNotFoundException] { + val error1 = intercept[AnalysisException] { getProvidingClass("avro") } assert(error1.getMessage.contains("spark-packages")) - val error2 = intercept[ClassNotFoundException] { + val error2 = intercept[AnalysisException] { getProvidingClass("com.databricks.spark.avro") } assert(error2.getMessage.contains("spark-packages")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index b5691450ca2f032a729faa648921629c1ca1af55..24de223cf899b8dad4703ffb600fd719eb11078b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1247,11 +1247,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("run sql directly on files") { + test("run sql directly on files - parquet") { val df = spark.range(100).toDF() withTempPath(f => { df.write.parquet(f.getCanonicalPath) - checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"), + // data source type is case insensitive + checkAnswer(sql(s"select id from Parquet.`${f.getCanonicalPath}`"), df) checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"), df) @@ -1260,6 +1261,49 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { }) } + test("run sql directly on files - orc") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.orc(f.getCanonicalPath) + // data source type is case insensitive + checkAnswer(sql(s"select id from ORC.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select id from `org.apache.spark.sql.hive.orc`.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select a.id from orc.`${f.getCanonicalPath}` as a"), + df) + }) + } + + test("run sql directly on files - csv") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.csv(f.getCanonicalPath) + // data source type is case insensitive + checkAnswer(sql(s"select cast(_c0 as int) id from CSV.`${f.getCanonicalPath}`"), + df) + checkAnswer( + sql(s"select cast(_c0 as int) id from `com.databricks.spark.csv`.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select cast(a._c0 as int) id from csv.`${f.getCanonicalPath}` as a"), + df) + }) + } + + test("run sql directly on files - json") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.json(f.getCanonicalPath) + // data source type is case insensitive + checkAnswer(sql(s"select id from jsoN.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"), + df) + }) + } + test("SPARK-8976 Wrong Result for Rollup #1") { checkAnswer(sql( "SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),