Skip to content
Snippets Groups Projects
Commit d000ca98 authored by Cheng Lian's avatar Cheng Lian Committed by Reynold Xin
Browse files

[SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on...

[SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan.

In cases like `Limit` and `TakeOrdered`, `executeCollect()` makes optimizations that `execute().collect()` will not.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #939 from liancheng/spark-1958 and squashes the following commits:

bdc4a14 [Cheng Lian] Copy rows to present immutable data to users
8250976 [Cheng Lian] Added return type explicitly for public API
192a25c [Cheng Lian] [SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan.
parent 9a5d482e
No related branches found
No related tags found
No related merge requests found
...@@ -368,6 +368,12 @@ class SchemaRDD( ...@@ -368,6 +368,12 @@ class SchemaRDD(
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd))) new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd)))
} }
// =======================================================================
// Overriden RDD actions
// =======================================================================
override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
// ======================================================================= // =======================================================================
// Base RDD functions that do NOT change schema // Base RDD functions that do NOT change schema
// ======================================================================= // =======================================================================
......
...@@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { ...@@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
/** /**
* Runs this query returning the result as an array. * Runs this query returning the result as an array.
*/ */
def executeCollect(): Array[Row] = execute().collect() def executeCollect(): Array[Row] = execute().map(_.copy()).collect()
protected def buildRow(values: Seq[Any]): Row = protected def buildRow(values: Seq[Any]): Row =
new GenericRow(values.toArray) new GenericRow(values.toArray)
......
...@@ -201,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { ...@@ -201,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
} }
test("insert (appending) to same table via Scala API") { test("insert (appending) to same table via Scala API") {
sql("INSERT INTO testsource SELECT * FROM testsource").collect() sql("INSERT INTO testsource SELECT * FROM testsource")
val double_rdd = sql("SELECT * FROM testsource").collect() val double_rdd = sql("SELECT * FROM testsource").collect()
assert(double_rdd != null) assert(double_rdd != null)
assert(double_rdd.size === 30) assert(double_rdd.size === 30)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment