From a20fea98811d98958567780815fcf0d4fb4e28d4 Mon Sep 17 00:00:00 2001
From: Cheng Hao <hao.cheng@intel.com>
Date: Thu, 15 May 2014 22:12:34 -0700
Subject: [PATCH] [Spark-1461] Deferred Expression Evaluation (short-circuit
 evaluation)

This patch unify the foldable & nullable interface for Expression.
1) Deterministic-less UDF (like Rand()) can not be folded.
2) Short-circut will significantly improves the performance in Expression Evaluation, however, the stateful UDF should not be ignored in a short-circuit evaluation(e.g. in expression: col1 > 0 and row_sequence() < 1000, row_sequence() can not be ignored even if col1 > 0 is false)

I brought an concept of DeferredObject from Hive, which has 2 kinds of children classes (EagerResult / DeferredResult), the former requires triggering the evaluation before it's created, while the later trigger the evaluation when first called its get() method.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #446 from chenghao-intel/expression_deferred_evaluation and squashes the following commits:

d2729de [Cheng Hao] Fix the codestyle issues
a08f09c [Cheng Hao] fix bug in or/and short-circuit evaluation
af2236b [Cheng Hao] revert the short-circuit expression evaluation for IF
b7861d2 [Cheng Hao] Add Support for Deferred Expression Evaluation
---
 .../sql/catalyst/expressions/predicates.scala | 47 +++++++++++++------
 .../org/apache/spark/sql/hive/hiveUdfs.scala  | 28 ++++++++---
 2 files changed, 53 insertions(+), 22 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 6ee479939d..d111578530 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -98,13 +98,19 @@ case class And(left: Expression, right: Expression) extends BinaryPredicate {
 
   override def eval(input: Row): Any = {
     val l = left.eval(input)
-    val r = right.eval(input)
-    if (l == false || r == false) {
-      false
-    } else if (l == null || r == null ) {
-      null
+    if (l == false) {
+       false
     } else {
-      true
+      val r = right.eval(input)
+      if (r == false) {
+        false
+      } else {
+        if (l != null && r != null) {
+          true
+        } else {
+          null
+        }
+      }
     }
   }
 }
@@ -114,13 +120,19 @@ case class Or(left: Expression, right: Expression) extends BinaryPredicate {
 
   override def eval(input: Row): Any = {
     val l = left.eval(input)
-    val r = right.eval(input)
-    if (l == true || r == true) {
+    if (l == true) {
       true
-    } else if (l == null || r == null) {
-      null
     } else {
-      false
+      val r = right.eval(input)
+      if (r == true) {
+        true
+      } else {
+        if (l != null && r != null) {
+          false
+        } else {
+          null
+        }
+      }
     }
   }
 }
@@ -133,8 +145,12 @@ case class Equals(left: Expression, right: Expression) extends BinaryComparison
   def symbol = "="
   override def eval(input: Row): Any = {
     val l = left.eval(input)
-    val r = right.eval(input)
-    if (l == null || r == null) null else l == r
+    if (l == null) {
+      null
+    } else {
+      val r = right.eval(input)
+      if (r == null) null else l == r
+    }
   }
 }
 
@@ -162,7 +178,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
     extends Expression {
 
   def children = predicate :: trueValue :: falseValue :: Nil
-  def nullable = trueValue.nullable || falseValue.nullable
+  override def nullable = trueValue.nullable || falseValue.nullable
   def references = children.flatMap(_.references).toSet
   override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType
   def dataType = {
@@ -175,8 +191,9 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
   }
 
   type EvaluatedType = Any
+
   override def eval(input: Row): Any = {
-    if (predicate.eval(input).asInstanceOf[Boolean]) {
+    if (true == predicate.eval(input)) {
       trueValue.eval(input)
     } else {
       falseValue.eval(input)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index d50e2c65b7..5729020423 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -248,17 +248,31 @@ private[hive] case class HiveGenericUdf(name: String, children: Seq[Expression])
     isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable)
   }
 
+  protected lazy val deferedObjects = Array.fill[DeferredObject](children.length)({
+    new DeferredObjectAdapter
+  })
+
+  // Adapter from Catalyst ExpressionResult to Hive DeferredObject
+  class DeferredObjectAdapter extends DeferredObject {
+    private var func: () => Any = _
+    def set(func: () => Any) {
+      this.func = func
+    }
+    override def prepare(i: Int) = {}
+    override def get(): AnyRef = wrap(func())
+  }
+
   val dataType: DataType = inspectorToDataType(returnInspector)
 
   override def eval(input: Row): Any = {
     returnInspector // Make sure initialized.
-    val args = children.map { v =>
-      new DeferredObject {
-        override def prepare(i: Int) = {}
-        override def get(): AnyRef = wrap(v.eval(input))
-      }
-    }.toArray
-    unwrap(function.evaluate(args))
+    var i = 0
+    while (i < children.length) {
+      val idx = i
+      deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => {children(idx).eval(input)})
+      i += 1
+    }
+    unwrap(function.evaluate(deferedObjects))
   }
 }
 
-- 
GitLab