diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index e5d1a1e2996c579d664d0a8dfbbac97422158501..1235204591bbde229688c6c1c1cac2978b299158 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -90,6 +90,12 @@ trait PredicateHelper { * Returns true iff `expr` could be evaluated as a condition within join. */ protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match { + case l: ListQuery => + // A ListQuery defines the query which we want to search in an IN subquery expression. + // Currently the only way to evaluate an IN subquery is to convert it to a + // LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule. + // It cannot be evaluated as part of a Join operator. + false case e: SubqueryExpression => // non-correlated subquery will be replaced as literal e.children.isEmpty diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 6feea4060f46ab94e15e4d4cdf991362fca00e7b..d846786473eb0c3388ee26ec26e80b1586633798 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -836,6 +836,26 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, answer) } + test("SPARK-20094: don't push predicate with IN subquery into join condition") { + val x = testRelation.subquery('x) + val z = testRelation.subquery('z) + val w = testRelation1.subquery('w) + + val queryPlan = x + .join(z) + .where(("x.b".attr === "z.b".attr) && + ("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr))))) + .analyze + + val expectedPlan = x + .join(z, Inner, Some("x.b".attr === "z.b".attr)) + .where("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr)))) + .analyze + + val optimized = Optimize.execute(queryPlan) + comparePlans(optimized, expectedPlan) + } + test("Window: predicate push down -- basic") { val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))