diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ee7de8692149635e676a3169c0a3ee84a27f885e..dbe3ded4bbf1572cabc3f1277f7da278ed60373a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -597,12 +597,14 @@ object CollapseRepartition extends Rule[LogicalPlan] {
 
 /**
  * Collapse Adjacent Window Expression.
- * - If the partition specs and order specs are the same, collapse into the parent.
+ * - If the partition specs and order specs are the same and the window expression are
+ *   independent, collapse into the parent.
  */
 object CollapseWindow extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-    case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 =>
-      w.copy(windowExpressions = we2 ++ we1, child = grandChild)
+    case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
+        if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty =>
+      w1.copy(windowExpressions = we2 ++ we1, child = grandChild)
   }
 }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
index 3f7d1d9fd99afa3c8cc6c76bb8589a164cf71a44..52054c2f8bd8de123c8c391a8c8b6e28c1117244 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -78,4 +78,15 @@ class CollapseWindowSuite extends PlanTest {
 
     comparePlans(optimized2, correctAnswer2)
   }
+
+  test("Don't collapse adjacent windows with dependent columns") {
+    val query = testRelation
+      .window(Seq(sum(a).as('sum_a)), partitionSpec1, orderSpec1)
+      .window(Seq(max('sum_a).as('max_sum_a)), partitionSpec1, orderSpec1)
+      .analyze
+
+    val expected = query.analyze
+    val optimized = Optimize.execute(query.analyze)
+    comparePlans(optimized, expected)
+  }
 }