From b9192bb3ffc319ebee7dbd15c24656795e454749 Mon Sep 17 00:00:00 2001
From: Ryan Blue <blue@apache.org>
Date: Tue, 8 Nov 2016 23:47:48 -0800
Subject: [PATCH] [SPARK-18368] Fix regexp_replace with task serialization.

## What changes were proposed in this pull request?

This makes the result value both transient and lazy, so that if the RegExpReplace object is initialized then serialized, `result: StringBuffer` will be correctly initialized.

## How was this patch tested?

* Verified that this patch fixed the query that found the bug.
* Added a test case that fails without the fix.

Author: Ryan Blue <blue@apache.org>

Closes #15816 from rdblue/SPARK-18368-fix-regexp-replace.
---
 .../catalyst/expressions/regexpExpressions.scala  |  2 +-
 .../expressions/ExpressionEvalHelper.scala        | 15 +++++++++------
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
index 5648ad6b6d..4896a6225a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
@@ -230,7 +230,7 @@ case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expressio
   @transient private var lastReplacement: String = _
   @transient private var lastReplacementInUTF8: UTF8String = _
   // result buffer write by Matcher
-  @transient private val result: StringBuffer = new StringBuffer
+  @transient private lazy val result: StringBuffer = new StringBuffer
 
   override def nullSafeEval(s: Any, p: Any, r: Any): Any = {
     if (!p.equals(lastRegex)) {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 9ceb709185..f83650424a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -22,7 +22,8 @@ import org.scalactic.TripleEqualsSupport.Spread
 import org.scalatest.exceptions.TestFailedException
 import org.scalatest.prop.GeneratorDrivenPropertyChecks
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
@@ -43,13 +44,15 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
 
   protected def checkEvaluation(
       expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
+    val serializer = new JavaSerializer(new SparkConf()).newInstance
+    val expr: Expression = serializer.deserialize(serializer.serialize(expression))
     val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
-    checkEvaluationWithoutCodegen(expression, catalystValue, inputRow)
-    checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, inputRow)
-    if (GenerateUnsafeProjection.canSupport(expression.dataType)) {
-      checkEvalutionWithUnsafeProjection(expression, catalystValue, inputRow)
+    checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
+    checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow)
+    if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
+      checkEvalutionWithUnsafeProjection(expr, catalystValue, inputRow)
     }
-    checkEvaluationWithOptimization(expression, catalystValue, inputRow)
+    checkEvaluationWithOptimization(expr, catalystValue, inputRow)
   }
 
   /**
-- 
GitLab