diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bf221e0d7cfc40e63bebae7dcf7db2d3f549bcef..eb46c0e72ecd9326ad7c3454ce3f55d04b7f5786 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -524,6 +524,10 @@ class Analyzer( val newVersion = oldVersion.newInstance() (oldVersion, newVersion) + case oldVersion: SerializeFromObject + if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty => + (oldVersion, oldVersion.copy(serializer = oldVersion.serializer.map(_.newInstance()))) + // Handle projects that create conflicting aliases. case oldVersion @ Project(projectList, _) if findAliases(projectList).intersect(conflictingAttributes).nonEmpty => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 0b6874e3b8ad32a4697c346055689be09a0661f0..a3881ff920159cac15424fd27adf4a8c64408722 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -769,6 +769,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext { } } + test("mapped dataset should resolve duplicated attributes for self join") { + val ds = Seq(1, 2, 3).toDS().map(_ + 1) + val ds1 = ds.as("d1") + val ds2 = ds.as("d2") + + checkDataset(ds1.joinWith(ds2, $"d1.value" === $"d2.value"), (2, 2), (3, 3), (4, 4)) + checkDataset(ds1.intersect(ds2), 2, 3, 4) + checkDataset(ds1.except(ds1)) + } + test("SPARK-15441: Dataset outer join") { val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDS().as("left") val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDS().as("right")