From 10b59ba230cb426f2a5d43cd0a4964a556e24c3f Mon Sep 17 00:00:00 2001
From: Takuya UESHIN <ueshin@happy-camper.st>
Date: Thu, 10 Jul 2014 19:27:24 -0700
Subject: [PATCH] [SPARK-2428][SQL] Add except and intersect methods to
 SchemaRDD.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #1355 from ueshin/issues/SPARK-2428 and squashes the following commits:

b6fa264 [Takuya UESHIN] Add except and intersect methods to SchemaRDD.
---
 .../org/apache/spark/sql/SchemaRDD.scala      | 20 ++++++++++++++++++
 .../org/apache/spark/sql/DslQuerySuite.scala  | 21 +++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 8bcfc7c064..0c95b66854 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -256,6 +256,26 @@ class SchemaRDD(
   def unionAll(otherPlan: SchemaRDD) =
     new SchemaRDD(sqlContext, Union(logicalPlan, otherPlan.logicalPlan))
 
+  /**
+   * Performs a relational except on two SchemaRDDs
+   *
+   * @param otherPlan the [[SchemaRDD]] that should be excepted from this one.
+   *
+   * @group Query
+   */
+  def except(otherPlan: SchemaRDD): SchemaRDD =
+    new SchemaRDD(sqlContext, Except(logicalPlan, otherPlan.logicalPlan))
+
+  /**
+   * Performs a relational intersect on two SchemaRDDs
+   *
+   * @param otherPlan the [[SchemaRDD]] that should be intersected with this one.
+   *
+   * @group Query
+   */
+  def intersect(otherPlan: SchemaRDD): SchemaRDD =
+    new SchemaRDD(sqlContext, Intersect(logicalPlan, otherPlan.logicalPlan))
+
   /**
    * Filters tuples using a function over the value of the specified column.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
index 04ac008682..68dae58728 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
@@ -168,4 +168,25 @@ class DslQuerySuite extends QueryTest {
   test("zero count") {
     assert(emptyTableData.count() === 0)
   }
+
+  test("except") {
+    checkAnswer(
+      lowerCaseData.except(upperCaseData),
+      (1, "a") ::
+      (2, "b") ::
+      (3, "c") ::
+      (4, "d") :: Nil)
+    checkAnswer(lowerCaseData.except(lowerCaseData), Nil)
+    checkAnswer(upperCaseData.except(upperCaseData), Nil)
+  }
+
+  test("intersect") {
+    checkAnswer(
+      lowerCaseData.intersect(lowerCaseData),
+      (1, "a") ::
+      (2, "b") ::
+      (3, "c") ::
+      (4, "d") :: Nil)
+    checkAnswer(lowerCaseData.intersect(upperCaseData), Nil)
+  }
 }
-- 
GitLab