From 985705301e5e55de14b00ad8ce3143e91aae185d Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@databricks.com>
Date: Sun, 27 Jul 2014 11:20:20 -0700
Subject: [PATCH] SPARK-2684: Update ExternalAppendOnlyMap to take an iterator
 as input

This will decrease object allocation from the "update" closure used in map.changeValue.

Author: Matei Zaharia <matei@databricks.com>

Closes #1607 from mateiz/spark-2684 and squashes the following commits:

b7d89e6 [Matei Zaharia] Add insertAll for Iterables too, and fix some code style
561fc97 [Matei Zaharia] Update ExternalAppendOnlyMap to take an iterator as input
---
 .../scala/org/apache/spark/Aggregator.scala   |  5 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala   |  7 +-
 .../collection/ExternalAppendOnlyMap.scala    | 77 +++++++++++++------
 .../ExternalAppendOnlyMapSuite.scala          | 17 ++--
 4 files changed, 64 insertions(+), 42 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 1d640579ef..ff0ca11749 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -55,10 +55,7 @@ case class Aggregator[K, V, C] (
       combiners.iterator
     } else {
       val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
-      while (iter.hasNext) {
-        val pair = iter.next()
-        combiners.insert(pair._1, pair._2)
-      }
+      combiners.insertAll(iter)
       // TODO: Make this non optional in a future release
       Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
       Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7d96089e52..6388ef82cc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -154,11 +154,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
         map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
     } else {
       val map = createExternalMap(numRdds)
-      rddIterators.foreach { case (it, depNum) =>
-        while (it.hasNext) {
-          val kv = it.next()
-          map.insert(kv._1, new CoGroupValue(kv._2, depNum))
-        }
+      for ((it, depNum) <- rddIterators) {
+        map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
       }
       context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
       context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index c22bb8d9c6..6f263c39d1 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -110,42 +110,69 @@ class ExternalAppendOnlyMap[K, V, C](
 
   /**
    * Insert the given key and value into the map.
+   */
+  def insert(key: K, value: V): Unit = {
+    insertAll(Iterator((key, value)))
+  }
+
+  /**
+   * Insert the given iterator of keys and values into the map.
    *
-   * If the underlying map is about to grow, check if the global pool of shuffle memory has
+   * When the underlying map needs to grow, check if the global pool of shuffle memory has
    * enough room for this to happen. If so, allocate the memory required to grow the map;
    * otherwise, spill the in-memory map to disk.
    *
    * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
    */
-  def insert(key: K, value: V) {
+  def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
+    // An update function for the map that we reuse across entries to avoid allocating
+    // a new closure each time
+    var curEntry: Product2[K, V] = null
     val update: (Boolean, C) => C = (hadVal, oldVal) => {
-      if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
+      if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)
     }
-    if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
-      val mapSize = currentMap.estimateSize()
-      var shouldSpill = false
-      val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
-
-      // Atomically check whether there is sufficient memory in the global pool for
-      // this map to grow and, if possible, allocate the required amount
-      shuffleMemoryMap.synchronized {
-        val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
-        val availableMemory = maxMemoryThreshold -
-          (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
-
-        // Assume map growth factor is 2x
-        shouldSpill = availableMemory < mapSize * 2
-        if (!shouldSpill) {
-          shuffleMemoryMap(threadId) = mapSize * 2
+
+    while (entries.hasNext) {
+      curEntry = entries.next()
+      if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
+        val mapSize = currentMap.estimateSize()
+        var shouldSpill = false
+        val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
+
+        // Atomically check whether there is sufficient memory in the global pool for
+        // this map to grow and, if possible, allocate the required amount
+        shuffleMemoryMap.synchronized {
+          val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
+          val availableMemory = maxMemoryThreshold -
+            (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
+
+          // Assume map growth factor is 2x
+          shouldSpill = availableMemory < mapSize * 2
+          if (!shouldSpill) {
+            shuffleMemoryMap(threadId) = mapSize * 2
+          }
+        }
+        // Do not synchronize spills
+        if (shouldSpill) {
+          spill(mapSize)
         }
       }
-      // Do not synchronize spills
-      if (shouldSpill) {
-        spill(mapSize)
-      }
+      currentMap.changeValue(curEntry._1, update)
+      numPairsInMemory += 1
     }
-    currentMap.changeValue(key, update)
-    numPairsInMemory += 1
+  }
+
+  /**
+   * Insert the given iterable of keys and values into the map.
+   *
+   * When the underlying map needs to grow, check if the global pool of shuffle memory has
+   * enough room for this to happen. If so, allocate the memory required to grow the map;
+   * otherwise, spill the in-memory map to disk.
+   *
+   * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
+   */
+  def insertAll(entries: Iterable[Product2[K, V]]): Unit = {
+    insertAll(entries.iterator)
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 428822949c..0b7ad184a4 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -63,12 +63,13 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
       mergeValue, mergeCombiners)
 
-    map.insert(1, 10)
-    map.insert(2, 20)
-    map.insert(3, 30)
-    map.insert(1, 100)
-    map.insert(2, 200)
-    map.insert(1, 1000)
+    map.insertAll(Seq(
+      (1, 10),
+      (2, 20),
+      (3, 30),
+      (1, 100),
+      (2, 200),
+      (1, 1000)))
     val it = map.iterator
     assert(it.hasNext)
     val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
@@ -282,7 +283,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       assert(w1.hashCode === w2.hashCode)
     }
 
-    (1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
+    map.insertAll((1 to 100000).iterator.map(_.toString).map(i => (i, i)))
     collisionPairs.foreach { case (w1, w2) =>
       map.insert(w1, w2)
       map.insert(w2, w1)
@@ -355,7 +356,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](
       createCombiner, mergeValue, mergeCombiners)
 
-    (1 to 100000).foreach { i => map.insert(i, i) }
+    map.insertAll((1 to 100000).iterator.map(i => (i, i)))
     map.insert(null.asInstanceOf[Int], 1)
     map.insert(1, null.asInstanceOf[Int])
     map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int])
-- 
GitLab