diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 368e9a539630b62feca8be6a2ca6b991a4a35a7f..08fb0199fc36fba603ecc20676849e2042569d35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1128,19 +1128,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be - // pushed beneath must satisfy the following two conditions: + // pushed beneath must satisfy the following conditions: // 1. All the expressions are part of window partitioning key. The expressions can be compound. - // 2. Deterministic + // 2. Deterministic. + // 3. Placed before any non-deterministic predicates. case filter @ Filter(condition, w: Window) if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => - cond.references.subsetOf(partitionAttrs) && cond.deterministic && - // This is for ensuring all the partitioning expressions have been converted to alias - // in Analyzer. Thus, we do not need to check if the expressions in conditions are - // the same as the expressions used in partitioning columns. - partitionAttrs.forall(_.isInstanceOf[Attribute]) + + val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => + cond.references.subsetOf(partitionAttrs) } + + val stayUp = rest ++ containingNonDeterministic + if (pushDown.nonEmpty) { val pushDownPredicate = pushDown.reduce(And) val newWindow = w.copy(child = Filter(pushDownPredicate, w.child)) @@ -1159,11 +1163,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // For each filter, expand the alias and check if the filter can be evaluated using // attributes produced by the aggregate operator's child operator. - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => + val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => val replaced = replaceAlias(cond, aliasMap) - replaced.references.subsetOf(aggregate.child.outputSet) && replaced.deterministic + replaced.references.subsetOf(aggregate.child.outputSet) } + val stayUp = rest ++ containingNonDeterministic + if (pushDown.nonEmpty) { val pushDownPredicate = pushDown.reduce(And) val replaced = replaceAlias(pushDownPredicate, aliasMap) @@ -1177,9 +1186,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { case filter @ Filter(condition, union: Union) => // Union could change the rows, so non-deterministic predicate can't be pushed down - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => - cond.deterministic - } + val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic) + if (pushDown.nonEmpty) { val pushDownCond = pushDown.reduceLeft(And) val output = union.output @@ -1219,9 +1227,15 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // come from grandchild. // TODO: non-deterministic predicates could be pushed through some operators that do not change // the rows. - val (pushDown, stayUp) = splitConjunctivePredicates(filter.condition).partition { cond => - cond.deterministic && cond.references.subsetOf(grandchild.outputSet) + val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(filter.condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => + cond.references.subsetOf(grandchild.outputSet) } + + val stayUp = rest ++ containingNonDeterministic + if (pushDown.nonEmpty) { val newChild = insertFilter(pushDown.reduceLeft(And)) if (stayUp.nonEmpty) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 9cb49e74ad34f0b321672a93e006ad3c725402dd..780e78ed1cf2c4aeb99282189434b60686aa9887 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -531,14 +531,14 @@ class FilterPushdownSuite extends PlanTest { val originalQuery = { testRelationWithArrayType .generate(Explode('c_arr), true, false, Some("arr")) - .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6)) + .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('c > 6)) } val optimized = Optimize.execute(originalQuery.analyze) val correctAnswer = { testRelationWithArrayType .where('b >= 5) .generate(Explode('c_arr), true, false, Some("arr")) - .where('a + Rand(10).as("rnd") > 6) + .where('a + Rand(10).as("rnd") > 6 && 'c > 6) .analyze } @@ -715,14 +715,14 @@ class FilterPushdownSuite extends PlanTest { val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int) val originalQuery = Union(Seq(testRelation, testRelation2)) - .where('a === 2L && 'b + Rand(10).as("rnd") === 3) + .where('a === 2L && 'b + Rand(10).as("rnd") === 3 && 'c > 5L) val optimized = Optimize.execute(originalQuery.analyze) val correctAnswer = Union(Seq( testRelation.where('a === 2L), testRelation2.where('d === 2L))) - .where('b + Rand(10).as("rnd") === 3) + .where('b + Rand(10).as("rnd") === 3 && 'c > 5L) .analyze comparePlans(optimized, correctAnswer)