diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index bed48b6f6101b1e2726c64a248dddc7ca08b1d8e..b1b3e00de14564f3cc9e738b74a05953241ba9ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -431,8 +431,11 @@ case class Range( end: Long, step: Long, numSlices: Int, - output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation { - require(step != 0, "step cannot be 0") + output: Seq[Attribute]) + extends LeafNode with MultiInstanceRelation { + + require(step != 0, s"step ($step) cannot be 0") + val numElements: BigInt = { val safeStart = BigInt(start) val safeEnd = BigInt(end) @@ -444,13 +447,20 @@ case class Range( } } - override def newInstance(): Range = - Range(start, end, step, numSlices, output.map(_.newInstance())) + override def newInstance(): Range = copy(output = output.map(_.newInstance())) override def statistics: Statistics = { val sizeInBytes = LongType.defaultSize * numElements Statistics( sizeInBytes = sizeInBytes ) } + + override def simpleString: String = { + if (step == 1) { + s"Range ($start, $end, splits=$numSlices)" + } else { + s"Range ($start, $end, step=$step, splits=$numSlices)" + } + } } case class Aggregate( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 91e2e077cf3ce60492eb1f29e2f205887958c0a1..a4dc03cd8b2605ad3a4e421c53756b092654eb93 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -197,8 +197,8 @@ class SessionCatalogSuite extends SparkFunSuite { test("create temp table") { val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable1 = Range(1, 10, 1, 10, Seq()) - val tempTable2 = Range(1, 20, 2, 10, Seq()) + val tempTable1 = Range(1, 10, 1, 10) + val tempTable2 = Range(1, 20, 2, 10) catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) catalog.createTempView("tbl2", tempTable2, overrideIfExists = false) assert(catalog.getTempTable("tbl1") == Option(tempTable1)) @@ -243,7 +243,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("drop temp table") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) @@ -304,7 +304,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("rename temp table") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable)) @@ -383,7 +383,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("lookup table relation") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable1 = Range(1, 10, 1, 10, Seq()) + val tempTable1 = Range(1, 10, 1, 10) val metastoreTable1 = externalCatalog.getTable("db2", "tbl1") sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") @@ -422,7 +422,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1")))) assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) // If database is explicitly specified, do not check temporary tables - val tempTable = Range(1, 10, 1, 10, Seq()) + val tempTable = Range(1, 10, 1, 10) catalog.createTempView("tbl3", tempTable, overrideIfExists = false) assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) // If database is not explicitly specified, check the current database @@ -434,7 +434,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) catalog.createTempView("tbl1", tempTable, overrideIfExists = false) catalog.createTempView("tbl4", tempTable, overrideIfExists = false) assert(catalog.listTables("db1").toSet == @@ -451,7 +451,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("list tables with pattern") { val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) catalog.createTempView("tbl1", tempTable, overrideIfExists = false) catalog.createTempView("tbl4", tempTable, overrideIfExists = false) assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 664e7f566145b638f56e8fd039018a54cce7fa8c..555a2f4c01d0e79422b0c32377465d92d658282d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -360,8 +360,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { generator, join = join, outer = outer, g.output, planLater(child)) :: Nil case logical.OneRowRelation => execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil - case r @ logical.Range(start, end, step, numSlices, output) => - execution.RangeExec(start, step, numSlices, r.numElements, output) :: Nil + case r : logical.Range => + execution.RangeExec(r) :: Nil case logical.RepartitionByExpression(expressions, child, nPartitions) => exchange.ShuffleExchange(HashPartitioning( expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index d492fa7c412df6c45f5a7564b8520880cad95c9e..89bde6ad73f3b74f3d4ecc641ecd8b050c1e9d5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} /** Physical plan for Project. */ @@ -305,22 +305,18 @@ case class SampleExec( /** - * Physical plan for range (generating a range of 64 bit numbers. - * - * @param start first number in the range, inclusive. - * @param step size of the step increment. - * @param numSlices number of partitions. - * @param numElements total number of elements to output. - * @param output output attributes. + * Physical plan for range (generating a range of 64 bit numbers). */ -case class RangeExec( - start: Long, - step: Long, - numSlices: Int, - numElements: BigInt, - output: Seq[Attribute]) +case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) extends LeafExecNode with CodegenSupport { + def start: Long = range.start + def step: Long = range.step + def numSlices: Int = range.numSlices + def numElements: BigInt = range.numElements + + override val output: Seq[Attribute] = range.output + private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -458,6 +454,8 @@ case class RangeExec( } } } + + override def simpleString: String = range.simpleString } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index e4d4cecd5b5dc5b5eeac9cb315664a5d5ccc1d46..cd434f7887db6ceb66ca67fcf5ccc0d306f84b3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -58,7 +58,7 @@ class CatalogSuite } private def createTempTable(name: String): Unit = { - sessionCatalog.createTempView(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true) + sessionCatalog.createTempView(name, Range(1, 2, 3, 4), overrideIfExists = true) } private def dropTable(name: String, db: Option[String] = None): Unit = {