From 1a7d7cc85fb24de21f1cde67d04467171b82e845 Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Sat, 12 Jul 2014 12:13:32 -0700
Subject: [PATCH] [SPARK-2405][SQL] Reusue same byte buffers when creating new
 instance of InMemoryRelation

Reuse byte buffers when creating unique attributes for multiple instances of an InMemoryRelation in a single query plan.

Author: Michael Armbrust <michael@databricks.com>

Closes #1332 from marmbrus/doubleCache and squashes the following commits:

4a19609 [Michael Armbrust] Clean up concurrency story by calculating buffersn the constructor.
b39c931 [Michael Armbrust] Allocations are kind of a side effect.
f67eff7 [Michael Armbrust] Reusue same byte buffers when creating new instance of InMemoryRelation
---
 .../analysis/MultiInstanceRelation.scala      |  2 +-
 .../columnar/InMemoryColumnarTableScan.scala  | 35 +++++++++++++------
 2 files changed, 25 insertions(+), 12 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index a6ce90854d..22941edef2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
  * of itself with globally unique expression ids.
  */
 trait MultiInstanceRelation {
-  def newInstance: this.type
+  def newInstance(): this.type
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index e1e4f24c6c..ff7f664d8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.columnar
 
+import java.nio.ByteBuffer
+
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -26,22 +29,19 @@ import org.apache.spark.SparkConf
 
 object InMemoryRelation {
   def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
-    new InMemoryRelation(child.output, useCompression, child)
+    new InMemoryRelation(child.output, useCompression, child)()
 }
 
 private[sql] case class InMemoryRelation(
     output: Seq[Attribute],
     useCompression: Boolean,
     child: SparkPlan)
+    (private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
   extends LogicalPlan with MultiInstanceRelation {
 
-  override def children = Seq.empty
-  override def references = Set.empty
-
-  override def newInstance() =
-    new InMemoryRelation(output.map(_.newInstance), useCompression, child).asInstanceOf[this.type]
-
-  lazy val cachedColumnBuffers = {
+  // If the cached column buffers were not passed in, we calculate them in the constructor.
+  // As in Spark, the actual work of caching is lazy.
+  if (_cachedColumnBuffers == null) {
     val output = child.output
     val cached = child.execute().mapPartitions { iterator =>
       val columnBuilders = output.map { attribute =>
@@ -62,10 +62,23 @@ private[sql] case class InMemoryRelation(
     }.cache()
 
     cached.setName(child.toString)
-    // Force the materialization of the cached RDD.
-    cached.count()
-    cached
+    _cachedColumnBuffers = cached
   }
+
+
+  override def children = Seq.empty
+
+  override def references = Set.empty
+
+  override def newInstance() = {
+    new InMemoryRelation(
+      output.map(_.newInstance),
+      useCompression,
+      child)(
+      _cachedColumnBuffers).asInstanceOf[this.type]
+  }
+
+  def cachedColumnBuffers = _cachedColumnBuffers
 }
 
 private[sql] case class InMemoryColumnarTableScan(
-- 
GitLab