From 2861b07bb030f72769f5b757b4a7d4a635807140 Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Sat, 29 Mar 2014 22:02:53 -0700
Subject: [PATCH] [SQL] SPARK-1354 Fix self-joins of parquet relations

@AndreSchumacher, please take a look.

https://spark-project.atlassian.net/browse/SPARK-1354

Author: Michael Armbrust <michael@databricks.com>

Closes #269 from marmbrus/parquetJoin and squashes the following commits:

4081e77 [Michael Armbrust] Create new instances of Parquet relation when multiple copies are in a single plan.
---
 .../spark/sql/parquet/ParquetRelation.scala   | 15 +++++++++++++--
 .../spark/sql/parquet/ParquetQuerySuite.scala | 19 +++++++++++++++++++
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 2b825f84ee..67a34e1f21 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -36,7 +36,7 @@ import parquet.schema.{MessageType, MessageTypeParser}
 import parquet.schema.{PrimitiveType => ParquetPrimitiveType}
 import parquet.schema.{Type => ParquetType}
 
-import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
 import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.types._
@@ -54,7 +54,8 @@ import org.apache.spark.sql.catalyst.types._
  * @param tableName The name of the relation that can be used in queries.
  * @param path The path to the Parquet file.
  */
-case class ParquetRelation(tableName: String, path: String) extends BaseRelation {
+case class ParquetRelation(tableName: String, path: String)
+  extends BaseRelation with MultiInstanceRelation {
 
   /** Schema derived from ParquetFile **/
   def parquetSchema: MessageType =
@@ -74,6 +75,16 @@ case class ParquetRelation(tableName: String, path: String) extends BaseRelation
   // Parquet files have no concepts of keys, therefore no Partitioner
   // Note: we could allow Block level access; needs to be thought through
   override def isPartitioned = false
+
+  override def newInstance = ParquetRelation(tableName, path).asInstanceOf[this.type]
+
+  // Equals must also take into account the output attributes so that we can distinguish between
+  // different instances of the same relation,
+  override def equals(other: Any) = other match {
+    case p: ParquetRelation =>
+      p.tableName == tableName && p.path == path && p.output == output
+    case _ => false
+  }
 }
 
 object ParquetRelation {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 71caa709af..ea1733b361 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -30,6 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.catalyst.util.getTempFilePath
 import org.apache.spark.sql.test.TestSQLContext
 
+// Implicits
+import org.apache.spark.sql.test.TestSQLContext._
+
 class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
   override def beforeAll() {
     ParquetTestData.writeFile()
@@ -39,6 +42,22 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
     ParquetTestData.testFile.delete()
   }
 
+  test("self-join parquet files") {
+    val x = ParquetTestData.testData.subquery('x)
+    val y = ParquetTestData.testData.subquery('y)
+    val query = x.join(y).where("x.myint".attr === "y.myint".attr)
+
+    // Check to make sure that the attributes from either side of the join have unique expression
+    // ids.
+    query.queryExecution.analyzed.output.filter(_.name == "myint") match {
+      case Seq(i1, i2) if(i1.exprId == i2.exprId) =>
+        fail(s"Duplicate expression IDs found in query plan: $query")
+      case Seq(_, _) => // All good
+    }
+
+    // TODO: We can't run this query as it NPEs
+  }
+
   test("Import of simple Parquet file") {
     val result = getRDD(ParquetTestData.testData).collect()
     assert(result.size === 15)
-- 
GitLab