From 8288e16a5a5a12a45207c13a1341c707c6b4b940 Mon Sep 17 00:00:00 2001
From: jerryshao <sshao@hortonworks.com>
Date: Wed, 1 Jun 2016 21:58:05 -0700
Subject: [PATCH] [SPARK-15620][SQL] Fix transformed dataset attributes revolve
 failure

## What changes were proposed in this pull request?

Join on transformed dataset has attributes conflicts, which make query execution failure, for example:

```
val dataset = Seq(1, 2, 3).toDs
val mappedDs = dataset.map(_ + 1)

mappedDs.as("t1").joinWith(mappedDs.as("t2"), $"t1.value" === $"t2.value").show()
```

will throw exception:

```
org.apache.spark.sql.AnalysisException: cannot resolve '`t1.value`' given input columns: [value];
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:62)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:59)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:287)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:287)
```

## How was this patch tested?

Unit test.

Author: jerryshao <sshao@hortonworks.com>

Closes #13399 from jerryshao/SPARK-15620.
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala  |  4 ++++
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++++++++++
 2 files changed, 14 insertions(+)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index bf221e0d7c..eb46c0e72e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -524,6 +524,10 @@ class Analyzer(
           val newVersion = oldVersion.newInstance()
           (oldVersion, newVersion)
 
+        case oldVersion: SerializeFromObject
+            if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
+          (oldVersion, oldVersion.copy(serializer = oldVersion.serializer.map(_.newInstance())))
+
         // Handle projects that create conflicting aliases.
         case oldVersion @ Project(projectList, _)
             if findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 0b6874e3b8..a3881ff920 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -769,6 +769,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("mapped dataset should resolve duplicated attributes for self join") {
+    val ds = Seq(1, 2, 3).toDS().map(_ + 1)
+    val ds1 = ds.as("d1")
+    val ds2 = ds.as("d2")
+
+    checkDataset(ds1.joinWith(ds2, $"d1.value" === $"d2.value"), (2, 2), (3, 3), (4, 4))
+    checkDataset(ds1.intersect(ds2), 2, 3, 4)
+    checkDataset(ds1.except(ds1))
+  }
+
   test("SPARK-15441: Dataset outer join") {
     val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDS().as("left")
     val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDS().as("right")
-- 
GitLab