Skip to content
Snippets Groups Projects
Commit f858f466 authored by Cody Koeninger's avatar Cody Koeninger Committed by Michael Armbrust
Browse files

SPARK-3462 push down filters and projections into Unions

Author: Cody Koeninger <cody.koeninger@mediacrossing.com>

Closes #2345 from koeninger/SPARK-3462 and squashes the following commits:

5c8d24d [Cody Koeninger] SPARK-3462 remove now-unused parameter
0788691 [Cody Koeninger] SPARK-3462 add tests, handle compatible schema with different aliases, per marmbrus feedback
ef47b3b [Cody Koeninger] SPARK-3462 push down filters and projections into Unions
parent ce59725b
No related branches found
No related tags found
No related merge requests found
...@@ -40,12 +40,60 @@ object Optimizer extends RuleExecutor[LogicalPlan] { ...@@ -40,12 +40,60 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
SimplifyCasts, SimplifyCasts,
SimplifyCaseConversionExpressions) :: SimplifyCaseConversionExpressions) ::
Batch("Filter Pushdown", FixedPoint(100), Batch("Filter Pushdown", FixedPoint(100),
UnionPushdown,
CombineFilters, CombineFilters,
PushPredicateThroughProject, PushPredicateThroughProject,
PushPredicateThroughJoin, PushPredicateThroughJoin,
ColumnPruning) :: Nil ColumnPruning) :: Nil
} }
/**
* Pushes operations to either side of a Union.
*/
object UnionPushdown extends Rule[LogicalPlan] {
/**
* Maps Attributes from the left side to the corresponding Attribute on the right side.
*/
def buildRewrites(union: Union): AttributeMap[Attribute] = {
assert(union.left.output.size == union.right.output.size)
AttributeMap(union.left.output.zip(union.right.output))
}
/**
* Rewrites an expression so that it can be pushed to the right side of a Union operator.
* This method relies on the fact that the output attributes of a union are always equal
* to the left child's output.
*/
def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]): A = {
val result = e transform {
case a: Attribute => rewrites(a)
}
// We must promise the compiler that we did not discard the names in the case of project
// expressions. This is safe since the only transformation is from Attribute => Attribute.
result.asInstanceOf[A]
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Push down filter into union
case Filter(condition, u @ Union(left, right)) =>
val rewrites = buildRewrites(u)
Union(
Filter(condition, left),
Filter(pushToRight(condition, rewrites), right))
// Push down projection into union
case Project(projectList, u @ Union(left, right)) =>
val rewrites = buildRewrites(u)
Union(
Project(projectList, left),
Project(projectList.map(pushToRight(_, rewrites)), right))
}
}
/** /**
* Attempts to eliminate the reading of unneeded columns from the query plan using the following * Attempts to eliminate the reading of unneeded columns from the query plan using the following
* transformations: * transformations:
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
class UnionPushdownSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateAnalysisOperators) ::
Batch("Union Pushdown", Once,
UnionPushdown) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
val testUnion = Union(testRelation, testRelation2)
test("union: filter to each side") {
val query = testUnion.where('a === 1)
val optimized = Optimize(query.analyze)
val correctAnswer =
Union(testRelation.where('a === 1), testRelation2.where('d === 1)).analyze
comparePlans(optimized, correctAnswer)
}
test("union: project to each side") {
val query = testUnion.select('b)
val optimized = Optimize(query.analyze)
val correctAnswer =
Union(testRelation.select('b), testRelation2.select('e)).analyze
comparePlans(optimized, correctAnswer)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment