From 64fbd1cef30033efd83570a752980ea658ee12bb Mon Sep 17 00:00:00 2001
From: Liang-Chi Hsieh <viirya@gmail.com>
Date: Tue, 26 Sep 2017 15:23:13 +0800
Subject: [PATCH] [SPARK-22124][SQL] Sample and Limit should also defer input
 evaluation under codegen

## What changes were proposed in this pull request?

We can override `usedInputs` to claim that an operator defers input evaluation. `Sample` and `Limit` are two operators which should claim it but don't. We should do it.

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19345 from viirya/SPARK-22124.
---
 .../apache/spark/sql/execution/basicPhysicalOperators.scala   | 4 ++++
 .../src/main/scala/org/apache/spark/sql/execution/limit.scala | 4 ++++
 2 files changed, 8 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 18142c44f0..8389e2f3d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -267,6 +267,10 @@ case class SampleExec(
     }
   }
 
+  // Mark this as empty. This plan doesn't need to evaluate any inputs and can defer the evaluation
+  // to the parent operator.
+  override def usedInputs: AttributeSet = AttributeSet.empty
+
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
     child.asInstanceOf[CodegenSupport].inputRDDs()
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 73a0f8735e..1f515e29b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -62,6 +62,10 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
     child.asInstanceOf[CodegenSupport].inputRDDs()
   }
 
+  // Mark this as empty. This plan doesn't need to evaluate any inputs and can defer the evaluation
+  // to the parent operator.
+  override def usedInputs: AttributeSet = AttributeSet.empty
+
   protected override def doProduce(ctx: CodegenContext): String = {
     child.asInstanceOf[CodegenSupport].produce(ctx, this)
   }
-- 
GitLab