diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index b10785b05d6c7ff8d3c643401794552329c9b372..f14df93160b756ff6c02f4f54cff1def4ba6cc96 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -24,7 +24,7 @@ import scala.math.BigDecimal.RoundingMode
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
 
@@ -174,10 +174,16 @@ case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) extends Lo
       case InSet(ar: Attribute, set) =>
         evaluateInSet(ar, set, update)
 
-      case IsNull(ar: Attribute) =>
+      // In current stage, we don't have advanced statistics such as sketches or histograms.
+      // As a result, some operator can't estimate `nullCount` accurately. E.g. left outer join
+      // estimation does not accurately update `nullCount` currently.
+      // So for IsNull and IsNotNull predicates, we only estimate them when the child is a leaf
+      // node, whose `nullCount` is accurate.
+      // This is a limitation due to lack of advanced stats. We should remove it in the future.
+      case IsNull(ar: Attribute) if plan.child.isInstanceOf[LeafNode] =>
         evaluateNullCheck(ar, isNull = true, update)
 
-      case IsNotNull(ar: Attribute) =>
+      case IsNotNull(ar: Attribute) if plan.child.isInstanceOf[LeafNode] =>
         evaluateNullCheck(ar, isNull = false, update)
 
       case _ =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
index 4691913c8c9863ed7a5823f25dfc8ddb4145c740..07abe1ed285335986cf45bb141964936e16a86ab 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.statsEstimation
 import java.sql.Date
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, Statistics}
+import org.apache.spark.sql.catalyst.plans.LeftOuter
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, Join, Statistics}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
 import org.apache.spark.sql.types._
 
@@ -340,6 +341,28 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
       expectedRowCount = 2)
   }
 
+  // This is a limitation test. We should remove it after the limitation is removed.
+  test("don't estimate IsNull or IsNotNull if the child is a non-leaf node") {
+    val attrIntLargerRange = AttributeReference("c1", IntegerType)()
+    val colStatIntLargerRange = ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
+      nullCount = 10, avgLen = 4, maxLen = 4)
+    val smallerTable = childStatsTestPlan(Seq(attrInt), 10L)
+    val largerTable = StatsTestPlan(
+      outputList = Seq(attrIntLargerRange),
+      rowCount = 30,
+      attributeStats = AttributeMap(Seq(attrIntLargerRange -> colStatIntLargerRange)))
+    val nonLeafChild = Join(largerTable, smallerTable, LeftOuter,
+      Some(EqualTo(attrIntLargerRange, attrInt)))
+
+    Seq(IsNull(attrIntLargerRange), IsNotNull(attrIntLargerRange)).foreach { predicate =>
+      validateEstimatedStats(
+        Filter(predicate, nonLeafChild),
+        // column stats don't change
+        Seq(attrInt -> colStatInt, attrIntLargerRange -> colStatIntLargerRange),
+        expectedRowCount = 30)
+    }
+  }
+
   private def childStatsTestPlan(outList: Seq[Attribute], tableRowCount: BigInt): StatsTestPlan = {
     StatsTestPlan(
       outputList = outList,