From bcb1ff81468eb4afc7c03b2bca18e99cc1ccf6b8 Mon Sep 17 00:00:00 2001 From: Cheng Hao <hao.cheng@intel.com> Date: Tue, 19 May 2015 15:20:46 -0700 Subject: [PATCH] [SPARK-7662] [SQL] Resolve correct names for generator in projection ``` select explode(map(value, key)) from src; ``` Throws exception ``` org.apache.spark.sql.AnalysisException: The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 2 aliases but got _c0 ; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:43) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveGenerate$$makeGeneratorOutput(Analyzer.scala:605) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16$$anonfun$22.apply(Analyzer.scala:562) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16$$anonfun$22.apply(Analyzer.scala:548) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16.applyOrElse(Analyzer.scala:548) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$$anonfun$apply$16.applyOrElse(Analyzer.scala:538) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222) ``` Author: Cheng Hao <hao.cheng@intel.com> Closes #6178 from chenghao-intel/explode and squashes the following commits: 916fbe9 [Cheng Hao] add more strict rules for TGF alias 5c3f2c5 [Cheng Hao] fix bug in unit test e1d93ab [Cheng Hao] Add more unit test 19db09e [Cheng Hao] resolve names for generator in projection --- .../sql/catalyst/analysis/Analyzer.scala | 15 +++++++++++ .../sql/hive/execution/HiveQuerySuite.scala | 6 ++--- .../sql/hive/execution/SQLQuerySuite.scala | 25 ++++++++++++++++++- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index dfa4215f2e..c239e83271 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -561,6 +561,21 @@ class Analyzer( /** Extracts a [[Generator]] expression and any names assigned by aliases to their output. */ private object AliasedGenerator { def unapply(e: Expression): Option[(Generator, Seq[String])] = e match { + case Alias(g: Generator, name) + if g.elementTypes.size > 1 && java.util.regex.Pattern.matches("_c[0-9]+", name) => { + // Assume the default name given by parser is "_c[0-9]+", + // TODO in long term, move the naming logic from Parser to Analyzer. + // In projection, Parser gave default name for TGF as does for normal UDF, + // but the TGF probably have multiple output columns/names. + // e.g. SELECT explode(map(key, value)) FROM src; + // Let's simply ignore the default given name for this case. + Some((g, Nil)) + } + case Alias(g: Generator, name) if g.elementTypes.size > 1 => + // If not given the default names, and the TGF with multiple output columns + failAnalysis( + s"""Expect multiple names given for ${g.getClass.getName}, + |but only single name '${name}' specified""".stripMargin) case Alias(g: Generator, name) => Some((g, name :: Nil)) case MultiAlias(g: Generator, names) => Some(g, names) case _ => None diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 089a57e25c..e7aec0b188 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -111,13 +111,13 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { | SELECT key FROM gen_tmp ORDER BY key ASC; """.stripMargin) - test("multiple generator in projection") { + test("multiple generators in projection") { intercept[AnalysisException] { - sql("SELECT explode(map(key, value)), key FROM src").collect() + sql("SELECT explode(array(key, key)), explode(array(key, key)) FROM src").collect() } intercept[AnalysisException] { - sql("SELECT explode(map(key, value)) as k1, k2, key FROM src").collect() + sql("SELECT explode(array(key, key)) as k1, explode(array(key, key)) FROM src").collect() } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e60d00e635..fbbf6ba594 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -548,13 +548,36 @@ class SQLQuerySuite extends QueryTest { dropTempTable("data") } - test("resolve udtf with single alias") { + test("resolve udtf in projection #1") { val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""")) read.json(rdd).registerTempTable("data") val df = sql("SELECT explode(a) AS val FROM data") val col = df("val") } + test("resolve udtf in projection #2") { + val rdd = sparkContext.makeRDD((1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}""")) + jsonRDD(rdd).registerTempTable("data") + checkAnswer(sql("SELECT explode(map(1, 1)) FROM data LIMIT 1"), Row(1, 1) :: Nil) + checkAnswer(sql("SELECT explode(map(1, 1)) as (k1, k2) FROM data LIMIT 1"), Row(1, 1) :: Nil) + intercept[AnalysisException] { + sql("SELECT explode(map(1, 1)) as k1 FROM data LIMIT 1") + } + + intercept[AnalysisException] { + sql("SELECT explode(map(1, 1)) as (k1, k2, k3) FROM data LIMIT 1") + } + } + + // TGF with non-TGF in project is allowed in Spark SQL, but not in Hive + test("TGF with non-TGF in projection") { + val rdd = sparkContext.makeRDD( """{"a": "1", "b":"1"}""" :: Nil) + jsonRDD(rdd).registerTempTable("data") + checkAnswer( + sql("SELECT explode(map(a, b)) as (k1, k2), a, b FROM data"), + Row("1", "1", "1", "1") :: Nil) + } + test("logical.Project should not be resolved if it contains aggregates or generators") { // This test is used to test the fix of SPARK-5875. // The original issue was that Project's resolved will be true when it contains -- GitLab