diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index f69a688555bbf3a90160fe14686febcc79d0ad5f..04c130314388ac47e29dbe08f595fe89467cb97b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -21,7 +21,7 @@ import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration.Duration
 
 import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext}
-import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
+import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
@@ -347,8 +347,12 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
-      .map(i => InternalRow(i)) :: Nil
+    val rdd = if (start == end || (start < end ^ 0 < step)) {
+      new EmptyRDD[InternalRow](sqlContext.sparkContext)
+    } else {
+      sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i))
+    }
+    rdd :: Nil
   }
 
   protected override def doProduce(ctx: CodegenContext): String = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 7b495656b93d7463746e31e3ee773b413b58f0a5..45afbd29d1907954ef4074e3a3b2f24981326e15 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -191,6 +191,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
       checkAnswer(sql("SELECT * FROM range(3)"), Row(0) :: Row(1) :: Row(2) :: Nil)
     }
   }
+
+  test("SPARK-21041 SparkSession.range()'s behavior is inconsistent with SparkContext.range()") {
+    val start = java.lang.Long.MAX_VALUE - 3
+    val end = java.lang.Long.MIN_VALUE + 2
+    Seq("false", "true").foreach { value =>
+      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value) {
+        assert(spark.range(start, end, 1).collect.length == 0)
+        assert(spark.range(start, start, 1).collect.length == 0)
+      }
+    }
+  }
 }
 
 object DataFrameRangeSuite {