diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ddd4b3755d6293ade54bb3838ca0b26f26ca1a3b..a4133feae816641bd64172a850117c5ab668c6d1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -40,12 +40,60 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
       SimplifyCasts,
       SimplifyCaseConversionExpressions) ::
     Batch("Filter Pushdown", FixedPoint(100),
+      UnionPushdown,
       CombineFilters,
       PushPredicateThroughProject,
       PushPredicateThroughJoin,
       ColumnPruning) :: Nil
 }
 
+/**
+  *  Pushes operations to either side of a Union.
+  */
+object UnionPushdown extends Rule[LogicalPlan] {
+
+  /**
+    *  Maps Attributes from the left side to the corresponding Attribute on the right side.
+    */
+  def buildRewrites(union: Union): AttributeMap[Attribute] = {
+    assert(union.left.output.size == union.right.output.size)
+
+    AttributeMap(union.left.output.zip(union.right.output))
+  }
+
+  /**
+    *  Rewrites an expression so that it can be pushed to the right side of a Union operator.
+    *  This method relies on the fact that the output attributes of a union are always equal
+    *  to the left child's output.
+    */
+  def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]): A = {
+    val result = e transform {
+      case a: Attribute => rewrites(a)
+    }
+
+    // We must promise the compiler that we did not discard the names in the case of project
+    // expressions.  This is safe since the only transformation is from Attribute => Attribute.
+    result.asInstanceOf[A]
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    // Push down filter into union
+    case Filter(condition, u @ Union(left, right)) =>
+      val rewrites = buildRewrites(u)
+      Union(
+        Filter(condition, left),
+        Filter(pushToRight(condition, rewrites), right))
+
+    // Push down projection into union
+    case Project(projectList, u @ Union(left, right)) =>
+      val rewrites = buildRewrites(u)
+      Union(
+        Project(projectList, left),
+        Project(projectList.map(pushToRight(_, rewrites)), right))
+  }
+}
+
+
 /**
  * Attempts to eliminate the reading of unneeded columns from the query plan using the following
  * transformations:
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..dfef87bd9133d2e44c2e35fe0e7285487972980c
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+
+class UnionPushdownSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Subqueries", Once,
+        EliminateAnalysisOperators) ::
+      Batch("Union Pushdown", Once,
+        UnionPushdown) :: Nil
+  }
+
+  val testRelation =  LocalRelation('a.int, 'b.int, 'c.int)
+  val testRelation2 =  LocalRelation('d.int, 'e.int, 'f.int)
+  val testUnion = Union(testRelation, testRelation2)
+
+  test("union: filter to each side") {
+    val query = testUnion.where('a === 1)
+
+    val optimized = Optimize(query.analyze)
+
+    val correctAnswer =
+      Union(testRelation.where('a === 1), testRelation2.where('d === 1)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("union: project to each side") {
+    val query = testUnion.select('b)
+
+    val optimized = Optimize(query.analyze)
+
+    val correctAnswer =
+      Union(testRelation.select('b), testRelation2.select('e)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+}