diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f5daba1543da9b308858a72345f08a5f4c9c8288..ca17f3e3d06ffa143ee5100dc996896c68a9af21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -79,7 +79,6 @@ class Analyzer( ExtractWindowExpressions :: GlobalAggregates :: UnresolvedHavingClauseAttributes :: - RemoveEvaluationFromSort :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, @@ -955,63 +954,6 @@ class Analyzer( Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } } - - /** - * Removes all still-need-evaluate ordering expressions from sort and use an inner project to - * materialize them, finally use a outer project to project them away to keep the result same. - * Then we can make sure we only sort by [[AttributeReference]]s. - * - * As an example, - * {{{ - * Sort('a, 'b + 1, - * Relation('a, 'b)) - * }}} - * will be turned into: - * {{{ - * Project('a, 'b, - * Sort('a, '_sortCondition, - * Project('a, 'b, ('b + 1).as("_sortCondition"), - * Relation('a, 'b)))) - * }}} - */ - object RemoveEvaluationFromSort extends Rule[LogicalPlan] { - private def hasAlias(expr: Expression) = { - expr.find { - case a: Alias => true - case _ => false - }.isDefined - } - - override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - // The ordering expressions have no effect to the output schema of `Sort`, - // so `Alias`s in ordering expressions are unnecessary and we should remove them. - case s @ Sort(ordering, _, _) if ordering.exists(hasAlias) => - val newOrdering = ordering.map(_.transformUp { - case Alias(child, _) => child - }.asInstanceOf[SortOrder]) - s.copy(order = newOrdering) - - case s @ Sort(ordering, global, child) - if s.expressions.forall(_.resolved) && s.childrenResolved && !s.hasNoEvaluation => - - val (ref, needEval) = ordering.partition(_.child.isInstanceOf[AttributeReference]) - - val namedExpr = needEval.map(_.child match { - case n: NamedExpression => n - case e => Alias(e, "_sortCondition")() - }) - - val newOrdering = ref ++ needEval.zip(namedExpr).map { case (order, ne) => - order.copy(child = ne.toAttribute) - } - - // Add still-need-evaluate ordering expressions into inner project and then project - // them away after the sort. - Project(child.output, - Sort(newOrdering, global, - Project(child.output ++ namedExpr, child))) - } - } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index aacfc86ab0e49df4d9d76dc725248cac7595edc9..7c404722d811c368e8d30ccdd22a357db8cea620 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -34,7 +34,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend }.nonEmpty ) - expressions.forall(_.resolved) && childrenResolved && !hasSpecialExpressions + !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions } } @@ -68,7 +68,7 @@ case class Generate( generator.resolved && childrenResolved && generator.elementTypes.length == generatorOutput.length && - generatorOutput.forall(_.resolved) + !generatorOutput.exists(!_.resolved) } // we don't want the gOutput to be taken as part of the expressions @@ -188,7 +188,7 @@ case class WithWindowDefinition( } /** - * @param order The ordering expressions, should all be [[AttributeReference]] + * @param order The ordering expressions * @param global True means global sorting apply for entire data set, * False means sorting only apply within the partition. * @param child Child logical plan @@ -198,11 +198,6 @@ case class Sort( global: Boolean, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output - - def hasNoEvaluation: Boolean = order.forall(_.child.isInstanceOf[AttributeReference]) - - override lazy val resolved: Boolean = - expressions.forall(_.resolved) && childrenResolved && hasNoEvaluation } case class Aggregate( @@ -217,7 +212,7 @@ case class Aggregate( }.nonEmpty ) - expressions.forall(_.resolved) && childrenResolved && !hasWindowExpressions + !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions } lazy val newAggregation: Option[Aggregate] = Utils.tryConvert(this) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index a86cefe941e8e3b73ee59220a781fe12e8ed8acb..221b4e92f086cce0d6f431b9234eb1c10e037d74 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -165,39 +165,11 @@ class AnalysisSuite extends AnalysisTest { test("pull out nondeterministic expressions from Sort") { val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation) - val analyzed = caseSensitiveAnalyzer.execute(plan) - analyzed.transform { - case s: Sort if s.expressions.exists(!_.deterministic) => - fail("nondeterministic expressions are not allowed in Sort") - } - } - - test("remove still-need-evaluate ordering expressions from sort") { - val a = testRelation2.output(0) - val b = testRelation2.output(1) - - def makeOrder(e: Expression): SortOrder = SortOrder(e, Ascending) - - val noEvalOrdering = makeOrder(a) - val noEvalOrderingWithAlias = makeOrder(Alias(Alias(b, "name1")(), "name2")()) - - val needEvalExpr = Coalesce(Seq(a, Literal("1"))) - val needEvalExpr2 = Coalesce(Seq(a, b)) - val needEvalOrdering = makeOrder(needEvalExpr) - val needEvalOrdering2 = makeOrder(needEvalExpr2) - - val plan = Sort( - Seq(noEvalOrdering, noEvalOrderingWithAlias, needEvalOrdering, needEvalOrdering2), - false, testRelation2) - - val evaluatedOrdering = makeOrder(AttributeReference("_sortCondition", StringType)()) - val materializedExprs = Seq(needEvalExpr, needEvalExpr2).map(e => Alias(e, "_sortCondition")()) - + val projected = Alias(Rand(33), "_nondeterministic")() val expected = - Project(testRelation2.output, - Sort(Seq(makeOrder(a), makeOrder(b), evaluatedOrdering, evaluatedOrdering), false, - Project(testRelation2.output ++ materializedExprs, testRelation2))) - + Project(testRelation.output, + Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false, + Project(testRelation.output :+ projected, testRelation))) checkAnalysis(plan, expected) } }