diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 93762ad1b91c20e963e781fd613ddf110f9196b8..11cd84b396ff45aa504f444d029e9246c9cfaa7b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -94,6 +94,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
       FoldablePropagation,
       OptimizeIn(conf),
       ConstantFolding,
+      ReorderAssociativeOperator,
       LikeSimplification,
       BooleanSimplification,
       SimplifyConditionals,
@@ -737,6 +738,44 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
   }
 }
 
+/**
+ * Reorder associative integral-type operators and fold all constants into one.
+ */
+object ReorderAssociativeOperator extends Rule[LogicalPlan] {
+  private def flattenAdd(e: Expression): Seq[Expression] = e match {
+    case Add(l, r) => flattenAdd(l) ++ flattenAdd(r)
+    case other => other :: Nil
+  }
+
+  private def flattenMultiply(e: Expression): Seq[Expression] = e match {
+    case Multiply(l, r) => flattenMultiply(l) ++ flattenMultiply(r)
+    case other => other :: Nil
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case q: LogicalPlan => q transformExpressionsDown {
+      case a: Add if a.deterministic && a.dataType.isInstanceOf[IntegralType] =>
+        val (foldables, others) = flattenAdd(a).partition(_.foldable)
+        if (foldables.size > 1) {
+          val foldableExpr = foldables.reduce((x, y) => Add(x, y))
+          val c = Literal.create(foldableExpr.eval(EmptyRow), a.dataType)
+          if (others.isEmpty) c else Add(others.reduce((x, y) => Add(x, y)), c)
+        } else {
+          a
+        }
+      case m: Multiply if m.deterministic && m.dataType.isInstanceOf[IntegralType] =>
+        val (foldables, others) = flattenMultiply(m).partition(_.foldable)
+        if (foldables.size > 1) {
+          val foldableExpr = foldables.reduce((x, y) => Multiply(x, y))
+          val c = Literal.create(foldableExpr.eval(EmptyRow), m.dataType)
+          if (others.isEmpty) c else Multiply(others.reduce((x, y) => Multiply(x, y)), c)
+        } else {
+          m
+        }
+    }
+  }
+}
+
 /**
  * Replaces [[Expression Expressions]] that can be statically evaluated with
  * equivalent [[Literal]] values.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..05e15e9ec4728cb577982554caeb4bd4be58fce8
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ReorderAssociativeOperatorSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("ReorderAssociativeOperator", Once,
+        ReorderAssociativeOperator) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+  test("Reorder associative operators") {
+    val originalQuery =
+      testRelation
+        .select(
+          (Literal(3) + ((Literal(1) + 'a) + 2)) + 4,
+          'b * 1 * 2 * 3 * 4,
+          ('b + 1) * 2 * 3 * 4,
+          'a + 1 + 'b + 2 + 'c + 3,
+          'a + 1 + 'b * 2 + 'c + 3,
+          Rand(0) * 1 * 2 * 3 * 4)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer =
+      testRelation
+        .select(
+          ('a + 10).as("((3 + ((1 + a) + 2)) + 4)"),
+          ('b * 24).as("((((b * 1) * 2) * 3) * 4)"),
+          (('b + 1) * 24).as("((((b + 1) * 2) * 3) * 4)"),
+          ('a + 'b + 'c + 6).as("(((((a + 1) + b) + 2) + c) + 3)"),
+          ('a + 'b * 2 + 'c + 4).as("((((a + 1) + (b * 2)) + c) + 3)"),
+          Rand(0) * 1 * 2 * 3 * 4)
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+}