From 5d54d71ddbac1fbb26925a8c9138bbb8c0e81db8 Mon Sep 17 00:00:00 2001
From: Cheng Hao <hao.cheng@intel.com>
Date: Mon, 11 Aug 2014 20:45:14 -0700
Subject: [PATCH] [SQL] [SPARK-2826] Reduce the memory copy while building the
 hashmap for HashOuterJoin

This is a follow up for #1147 , this PR will improve the performance about 10% - 15% in my local tests.
```
Before:
LeftOuterJoin: took 16750 ms ([3000000] records)
LeftOuterJoin: took 15179 ms ([3000000] records)
RightOuterJoin: took 15515 ms ([3000000] records)
RightOuterJoin: took 15276 ms ([3000000] records)
FullOuterJoin: took 19150 ms ([6000000] records)
FullOuterJoin: took 18935 ms ([6000000] records)

After:
LeftOuterJoin: took 15218 ms ([3000000] records)
LeftOuterJoin: took 13503 ms ([3000000] records)
RightOuterJoin: took 13663 ms ([3000000] records)
RightOuterJoin: took 14025 ms ([3000000] records)
FullOuterJoin: took 16624 ms ([6000000] records)
FullOuterJoin: took 16578 ms ([6000000] records)
```

Besides the performance improvement, I also do some clean up as suggested in #1147

Author: Cheng Hao <hao.cheng@intel.com>

Closes #1765 from chenghao-intel/hash_outer_join_fixing and squashes the following commits:

ab1f9e0 [Cheng Hao] Reduce the memory copy while building the hashmap
---
 .../apache/spark/sql/execution/joins.scala    | 54 ++++++++++---------
 1 file changed, 28 insertions(+), 26 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
index ea075f8c65..c86811e838 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.{HashMap => JavaHashMap}
+
 import scala.collection.mutable.{ArrayBuffer, BitSet}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent._
@@ -136,14 +138,6 @@ trait HashJoin {
   }
 }
 
-/**
- * Constant Value for Binary Join Node
- */
-object HashOuterJoin {
-  val DUMMY_LIST = Seq[Row](null)
-  val EMPTY_LIST = Seq[Row]()
-}
-
 /**
  * :: DeveloperApi ::
  * Performs a hash based outer join for two child relations by shuffling the data using 
@@ -181,6 +175,9 @@ case class HashOuterJoin(
     }
   }
 
+  @transient private[this] lazy val DUMMY_LIST = Seq[Row](null)
+  @transient private[this] lazy val EMPTY_LIST = Seq.empty[Row]
+
   // TODO we need to rewrite all of the iterators with our own implementation instead of the Scala
   // iterator for performance purpose. 
 
@@ -199,8 +196,8 @@ case class HashOuterJoin(
         joinedRow.copy
       } else {
         Nil
-      }) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
-        // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
+      }) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
+        // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
         // as we don't know whether we need to append it until finish iterating all of the 
         // records in right side.
         // If we didn't get any proper row, then append a single row with empty right
@@ -224,8 +221,8 @@ case class HashOuterJoin(
         joinedRow.copy
       } else {
         Nil
-      }) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
-        // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
+      }) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
+        // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
         // as we don't know whether we need to append it until finish iterating all of the 
         // records in left side.
         // If we didn't get any proper row, then append a single row with empty left.
@@ -259,10 +256,10 @@ case class HashOuterJoin(
             rightMatchedSet.add(idx)
             joinedRow.copy
           }
-        } ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
+        } ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
           // 2. For those unmatched records in left, append additional records with empty right.
 
-          // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
+          // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
           // as we don't know whether we need to append it until finish iterating all 
           // of the records in right side.
           // If we didn't get any proper row, then append a single row with empty right.
@@ -287,18 +284,22 @@ case class HashOuterJoin(
   }
 
   private[this] def buildHashTable(
-      iter: Iterator[Row], keyGenerator: Projection): Map[Row, ArrayBuffer[Row]] = {
-    // TODO: Use Spark's HashMap implementation.
-    val hashTable = scala.collection.mutable.Map[Row, ArrayBuffer[Row]]()
+      iter: Iterator[Row], keyGenerator: Projection): JavaHashMap[Row, ArrayBuffer[Row]] = {
+    val hashTable = new JavaHashMap[Row, ArrayBuffer[Row]]()
     while (iter.hasNext) {
       val currentRow = iter.next()
       val rowKey = keyGenerator(currentRow)
 
-      val existingMatchList = hashTable.getOrElseUpdate(rowKey, {new ArrayBuffer[Row]()})
+      var existingMatchList = hashTable.get(rowKey)
+      if (existingMatchList == null) {
+        existingMatchList = new ArrayBuffer[Row]()
+        hashTable.put(rowKey, existingMatchList)
+      }
+
       existingMatchList += currentRow.copy()
     }
-    
-    hashTable.toMap[Row, ArrayBuffer[Row]]
+
+    hashTable
   }
 
   def execute() = {
@@ -309,21 +310,22 @@ case class HashOuterJoin(
       // Build HashMap for current partition in right relation
       val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
 
+      import scala.collection.JavaConversions._
       val boundCondition = 
         condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
       joinType match {
         case LeftOuter => leftHashTable.keysIterator.flatMap { key =>
-          leftOuterIterator(key, leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST), 
-            rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+          leftOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST), 
+            rightHashTable.getOrElse(key, EMPTY_LIST))
         }
         case RightOuter => rightHashTable.keysIterator.flatMap { key =>
-          rightOuterIterator(key, leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST), 
-            rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+          rightOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST), 
+            rightHashTable.getOrElse(key, EMPTY_LIST))
         }
         case FullOuter => (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
           fullOuterIterator(key, 
-            leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST), 
-            rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+            leftHashTable.getOrElse(key, EMPTY_LIST), 
+            rightHashTable.getOrElse(key, EMPTY_LIST))
         }
         case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType")
       }
-- 
GitLab