From f858f466862541c3faad76a1fa2391f1c17ec9dd Mon Sep 17 00:00:00 2001
From: Cody Koeninger <cody.koeninger@mediacrossing.com>
Date: Thu, 11 Sep 2014 17:49:36 -0700
Subject: [PATCH] SPARK-3462 push down filters and projections into Unions

Author: Cody Koeninger <cody.koeninger@mediacrossing.com>

Closes #2345 from koeninger/SPARK-3462 and squashes the following commits:

5c8d24d [Cody Koeninger] SPARK-3462 remove now-unused parameter
0788691 [Cody Koeninger] SPARK-3462 add tests, handle compatible schema with different aliases, per marmbrus feedback
ef47b3b [Cody Koeninger] SPARK-3462 push down filters and projections into Unions
---
 .../sql/catalyst/optimizer/Optimizer.scala    | 48 ++++++++++++++
 .../optimizer/UnionPushdownSuite.scala        | 62 +++++++++++++++++++
 2 files changed, 110 insertions(+)
 create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ddd4b3755d..a4133feae8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -40,12 +40,60 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
       SimplifyCasts,
       SimplifyCaseConversionExpressions) ::
     Batch("Filter Pushdown", FixedPoint(100),
+      UnionPushdown,
       CombineFilters,
       PushPredicateThroughProject,
       PushPredicateThroughJoin,
       ColumnPruning) :: Nil
 }
 
+/**
+  *  Pushes operations to either side of a Union.
+  */
+object UnionPushdown extends Rule[LogicalPlan] {
+
+  /**
+    *  Maps Attributes from the left side to the corresponding Attribute on the right side.
+    */
+  def buildRewrites(union: Union): AttributeMap[Attribute] = {
+    assert(union.left.output.size == union.right.output.size)
+
+    AttributeMap(union.left.output.zip(union.right.output))
+  }
+
+  /**
+    *  Rewrites an expression so that it can be pushed to the right side of a Union operator.
+    *  This method relies on the fact that the output attributes of a union are always equal
+    *  to the left child's output.
+    */
+  def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]): A = {
+    val result = e transform {
+      case a: Attribute => rewrites(a)
+    }
+
+    // We must promise the compiler that we did not discard the names in the case of project
+    // expressions.  This is safe since the only transformation is from Attribute => Attribute.
+    result.asInstanceOf[A]
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    // Push down filter into union
+    case Filter(condition, u @ Union(left, right)) =>
+      val rewrites = buildRewrites(u)
+      Union(
+        Filter(condition, left),
+        Filter(pushToRight(condition, rewrites), right))
+
+    // Push down projection into union
+    case Project(projectList, u @ Union(left, right)) =>
+      val rewrites = buildRewrites(u)
+      Union(
+        Project(projectList, left),
+        Project(projectList.map(pushToRight(_, rewrites)), right))
+  }
+}
+
+
 /**
  * Attempts to eliminate the reading of unneeded columns from the query plan using the following
  * transformations:
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala
new file mode 100644
index 0000000000..dfef87bd91
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+
+class UnionPushdownSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Subqueries", Once,
+        EliminateAnalysisOperators) ::
+      Batch("Union Pushdown", Once,
+        UnionPushdown) :: Nil
+  }
+
+  val testRelation =  LocalRelation('a.int, 'b.int, 'c.int)
+  val testRelation2 =  LocalRelation('d.int, 'e.int, 'f.int)
+  val testUnion = Union(testRelation, testRelation2)
+
+  test("union: filter to each side") {
+    val query = testUnion.where('a === 1)
+
+    val optimized = Optimize(query.analyze)
+
+    val correctAnswer =
+      Union(testRelation.where('a === 1), testRelation2.where('d === 1)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("union: project to each side") {
+    val query = testUnion.select('b)
+
+    val optimized = Optimize(query.analyze)
+
+    val correctAnswer =
+      Union(testRelation.select('b), testRelation2.select('e)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+}
-- 
GitLab