From b734ed0c229373dbc589b9eca7327537ca458138 Mon Sep 17 00:00:00 2001 From: Eric Liang <ekl@google.com> Date: Tue, 9 Sep 2014 23:47:12 -0700 Subject: [PATCH] [SPARK-3395] [SQL] DSL sometimes incorrectly reuses attribute ids, breaking queries This resolves https://issues.apache.org/jira/browse/SPARK-3395 Author: Eric Liang <ekl@google.com> Closes #2266 from ericl/spark-3395 and squashes the following commits: 7f2b6f0 [Eric Liang] add regression test 05bd1e4 [Eric Liang] in the dsl, create a new schema instance in each applySchema --- .../scala/org/apache/spark/sql/SchemaRDD.scala | 3 ++- .../scala/org/apache/spark/sql/DslQuerySuite.scala | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 33b2ed1b3a..d2ceb4a2b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -428,7 +428,8 @@ class SchemaRDD( */ private def applySchema(rdd: RDD[Row]): SchemaRDD = { new SchemaRDD(sqlContext, - SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))(sqlContext)) + SparkLogicalPlan( + ExistingRdd(queryExecution.analyzed.output.map(_.newInstance), rdd))(sqlContext)) } // ======================================================================= diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index 1a6a6c1747..d001abb7e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.test._ /* Implicits */ @@ -133,6 +135,18 @@ class DslQuerySuite extends QueryTest { mapData.take(1).toSeq) } + test("SPARK-3395 limit distinct") { + val filtered = TestData.testData2 + .distinct() + .orderBy(SortOrder('a, Ascending), SortOrder('b, Ascending)) + .limit(1) + .registerTempTable("onerow") + checkAnswer( + sql("select * from onerow inner join testData2 on onerow.a = testData2.a"), + (1, 1, 1, 1) :: + (1, 1, 1, 2) :: Nil) + } + test("average") { checkAnswer( testData2.groupBy()(avg('a)), -- GitLab