From a92e095e705155ea10c8311f7856b964d654626a Mon Sep 17 00:00:00 2001
From: Dongjoon Hyun <dongjoon@apache.org>
Date: Mon, 12 Jun 2017 20:58:27 +0800
Subject: [PATCH] [SPARK-21041][SQL] SparkSession.range should be consistent
 with SparkContext.range

## What changes were proposed in this pull request?

This PR fixes the inconsistency in `SparkSession.range`.

**BEFORE**
```scala
scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect
res2: Array[Long] = Array(9223372036854775804, 9223372036854775805, 9223372036854775806)
```

**AFTER**
```scala
scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect
res2: Array[Long] = Array()
```

## How was this patch tested?

Pass the Jenkins with newly added test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #18257 from dongjoon-hyun/SPARK-21041.
---
 .../spark/sql/execution/basicPhysicalOperators.scala  | 10 +++++++---
 .../org/apache/spark/sql/DataFrameRangeSuite.scala    | 11 +++++++++++
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index f69a688555..04c1303143 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -21,7 +21,7 @@ import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration.Duration
 
 import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext}
-import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
+import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
@@ -347,8 +347,12 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
-      .map(i => InternalRow(i)) :: Nil
+    val rdd = if (start == end || (start < end ^ 0 < step)) {
+      new EmptyRDD[InternalRow](sqlContext.sparkContext)
+    } else {
+      sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i))
+    }
+    rdd :: Nil
   }
 
   protected override def doProduce(ctx: CodegenContext): String = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 7b495656b9..45afbd29d1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -191,6 +191,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
       checkAnswer(sql("SELECT * FROM range(3)"), Row(0) :: Row(1) :: Row(2) :: Nil)
     }
   }
+
+  test("SPARK-21041 SparkSession.range()'s behavior is inconsistent with SparkContext.range()") {
+    val start = java.lang.Long.MAX_VALUE - 3
+    val end = java.lang.Long.MIN_VALUE + 2
+    Seq("false", "true").foreach { value =>
+      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value) {
+        assert(spark.range(start, end, 1).collect.length == 0)
+        assert(spark.range(start, start, 1).collect.length == 0)
+      }
+    }
+  }
 }
 
 object DataFrameRangeSuite {
-- 
GitLab