From a46b8f2d710d82ba3a212cac64b610a67b8798f9 Mon Sep 17 00:00:00 2001
From: Andrew Or <andrewor14@gmail.com>
Date: Thu, 28 Aug 2014 17:05:21 -0700
Subject: [PATCH] [SPARK-3277] Fix external spilling with LZ4 assertion error

**Summary of the changes**

The bulk of this PR is comprised of tests and documentation; the actual fix is really just adding 1 line of code (see `BlockObjectWriter.scala`). We currently do not run the `External*` test suites with different compression codecs, and this would have caught the bug reported in [SPARK-3277](https://issues.apache.org/jira/browse/SPARK-3277). This PR extends the existing code to test spilling using all compression codecs known to Spark, including `LZ4`.

**The bug itself**

In `DiskBlockObjectWriter`, we only report the shuffle bytes written before we close the streams. With `LZ4`, all the bytes written reported by our metrics were 0 because `flush()` was not taking effect for some reason. In general, compression codecs may write additional bytes to the file after we call `close()`, and so we must also capture those bytes in our shuffle write metrics.

Thanks mridulm and pwendell for help with debugging.

Author: Andrew Or <andrewor14@gmail.com>
Author: Patrick Wendell <pwendell@gmail.com>

Closes #2187 from andrewor14/fix-lz4-spilling and squashes the following commits:

1b54bdc [Andrew Or] Speed up tests by not compressing everything
1c4624e [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-lz4-spilling
6b2e7d1 [Andrew Or] Fix compilation error
92e251b [Patrick Wendell] Better documentation for BlockObjectWriter.
a1ad536 [Andrew Or] Fix tests
089593f [Andrew Or] Actually fix SPARK-3277 (tests still fail)
4bbcf68 [Andrew Or] Update tests to actually test all compression codecs
b264a84 [Andrew Or] ExternalAppendOnlyMapSuite code style fixes (minor)
1bfa743 [Andrew Or] Add more information to assert for better debugging
---
 .../apache/spark/io/CompressionCodec.scala    |   1 +
 .../spark/storage/BlockObjectWriter.scala     |  37 +++-
 .../collection/ExternalAppendOnlyMap.scala    |   7 +-
 .../spark/scheduler/ReplayListenerSuite.scala |   5 +-
 .../ExternalAppendOnlyMapSuite.scala          | 190 ++++++++++--------
 5 files changed, 144 insertions(+), 96 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index ef9c43ecf1..1ac7f4e448 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -64,6 +64,7 @@ private[spark] object CompressionCodec {
   }
 
   val DEFAULT_COMPRESSION_CODEC = "snappy"
+  val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
 }
 
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index adda971fd7..9c469370ff 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -65,8 +65,6 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
 
 /**
  * BlockObjectWriter which writes directly to a file on disk. Appends to the given file.
- * The given write metrics will be updated incrementally, but will not necessarily be current until
- * commitAndClose is called.
  */
 private[spark] class DiskBlockObjectWriter(
     blockId: BlockId,
@@ -75,6 +73,8 @@ private[spark] class DiskBlockObjectWriter(
     bufferSize: Int,
     compressStream: OutputStream => OutputStream,
     syncWrites: Boolean,
+    // These write metrics concurrently shared with other active BlockObjectWriter's who
+    // are themselves performing writes. All updates must be relative.
     writeMetrics: ShuffleWriteMetrics)
   extends BlockObjectWriter(blockId)
   with Logging
@@ -94,14 +94,30 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+  private var initialized = false
+
+  /**
+   * Cursors used to represent positions in the file.
+   *
+   * xxxxxxxx|--------|---       |
+   *         ^        ^          ^
+   *         |        |        finalPosition
+   *         |      reportedPosition
+   *       initialPosition
+   *
+   * initialPosition: Offset in the file where we start writing. Immutable.
+   * reportedPosition: Position at the time of the last update to the write metrics.
+   * finalPosition: Offset where we stopped writing. Set on closeAndCommit() then never changed.
+   * -----: Current writes to the underlying file.
+   * xxxxx: Existing contents of the file.
+   */
   private val initialPosition = file.length()
   private var finalPosition: Long = -1
-  private var initialized = false
+  private var reportedPosition = initialPosition
 
   /** Calling channel.position() to update the write metrics can be a little bit expensive, so we
     * only call it every N writes */
   private var writesSinceMetricsUpdate = 0
-  private var lastPosition = initialPosition
 
   override def open(): BlockObjectWriter = {
     fos = new FileOutputStream(file, true)
@@ -140,17 +156,18 @@ private[spark] class DiskBlockObjectWriter(
       //       serializer stream and the lower level stream.
       objOut.flush()
       bs.flush()
-      updateBytesWritten()
       close()
     }
     finalPosition = file.length()
+    // In certain compression codecs, more bytes are written after close() is called
+    writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition)
   }
 
   // Discard current writes. We do this by flushing the outstanding writes and then
   // truncating the file to its initial position.
   override def revertPartialWritesAndClose() {
     try {
-      writeMetrics.shuffleBytesWritten -= (lastPosition - initialPosition)
+      writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition)
 
       if (initialized) {
         objOut.flush()
@@ -189,10 +206,14 @@ private[spark] class DiskBlockObjectWriter(
     new FileSegment(file, initialPosition, finalPosition - initialPosition)
   }
 
+  /**
+   * Report the number of bytes written in this writer's shuffle write metrics.
+   * Note that this is only valid before the underlying streams are closed.
+   */
   private def updateBytesWritten() {
     val pos = channel.position()
-    writeMetrics.shuffleBytesWritten += (pos - lastPosition)
-    lastPosition = pos
+    writeMetrics.shuffleBytesWritten += (pos - reportedPosition)
+    reportedPosition = pos
   }
 
   private def callWithTiming(f: => Unit) = {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 9f85b94a70..8a015c1d26 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -413,7 +413,12 @@ class ExternalAppendOnlyMap[K, V, C](
     extends Iterator[(K, C)]
   {
     private val batchOffsets = batchSizes.scanLeft(0L)(_ + _)  // Size will be batchSize.length + 1
-    assert(file.length() == batchOffsets(batchOffsets.length - 1))
+    assert(file.length() == batchOffsets.last,
+      "File length is not equal to the last batch offset:\n" +
+      s"    file length = ${file.length}\n" +
+      s"    last batch offset = ${batchOffsets.last}\n" +
+      s"    all batch offsets = ${batchOffsets.mkString(",")}"
+    )
 
     private var batchIndex = 0  // Which batch we're in
     private var fileStream: FileInputStream = null
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index d81499ac6a..6b6e0104e5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -33,10 +33,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
  */
 class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
   private val fileSystem = Utils.getHadoopFileSystem("/")
-  private val allCompressionCodecs = Seq[String](
-    "org.apache.spark.io.LZFCompressionCodec",
-    "org.apache.spark.io.SnappyCompressionCodec"
-  )
+  private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
   private var testDir: File = _
 
   before {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 04d7338488..ac3931e3d0 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -23,37 +23,42 @@ import org.scalatest.FunSuite
 
 import org.apache.spark._
 import org.apache.spark.SparkContext._
+import org.apache.spark.io.CompressionCodec
 
 class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
+  private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
+  private def createCombiner[T](i: T) = ArrayBuffer[T](i)
+  private def mergeValue[T](buffer: ArrayBuffer[T], i: T): ArrayBuffer[T] = buffer += i
+  private def mergeCombiners[T](buf1: ArrayBuffer[T], buf2: ArrayBuffer[T]): ArrayBuffer[T] =
+    buf1 ++= buf2
 
-  private def createCombiner(i: Int) = ArrayBuffer[Int](i)
-  private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
-  private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
+  private def createExternalMap[T] = new ExternalAppendOnlyMap[T, T, ArrayBuffer[T]](
+    createCombiner[T], mergeValue[T], mergeCombiners[T])
 
-  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
+  private def createSparkConf(loadDefaults: Boolean, codec: Option[String] = None): SparkConf = {
     val conf = new SparkConf(loadDefaults)
     // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
     // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
     conf.set("spark.serializer.objectStreamReset", "1")
     conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+    conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
+    codec.foreach { c => conf.set("spark.io.compression.codec", c) }
     // Ensure that we actually have multiple batches per spill file
     conf.set("spark.shuffle.spill.batchSize", "10")
     conf
   }
 
   test("simple insert") {
-    val conf = createSparkConf(false)
+    val conf = createSparkConf(loadDefaults = false)
     sc = new SparkContext("local", "test", conf)
-
-    val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
-      mergeValue, mergeCombiners)
+    val map = createExternalMap[Int]
 
     // Single insert
     map.insert(1, 10)
     var it = map.iterator
     assert(it.hasNext)
     val kv = it.next()
-    assert(kv._1 == 1 && kv._2 == ArrayBuffer[Int](10))
+    assert(kv._1 === 1 && kv._2 === ArrayBuffer[Int](10))
     assert(!it.hasNext)
 
     // Multiple insert
@@ -61,18 +66,17 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     map.insert(3, 30)
     it = map.iterator
     assert(it.hasNext)
-    assert(it.toSet == Set[(Int, ArrayBuffer[Int])](
+    assert(it.toSet === Set[(Int, ArrayBuffer[Int])](
       (1, ArrayBuffer[Int](10)),
       (2, ArrayBuffer[Int](20)),
       (3, ArrayBuffer[Int](30))))
+    sc.stop()
   }
 
   test("insert with collision") {
-    val conf = createSparkConf(false)
+    val conf = createSparkConf(loadDefaults = false)
     sc = new SparkContext("local", "test", conf)
-
-    val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
-      mergeValue, mergeCombiners)
+    val map = createExternalMap[Int]
 
     map.insertAll(Seq(
       (1, 10),
@@ -84,30 +88,28 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     val it = map.iterator
     assert(it.hasNext)
     val result = it.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
-    assert(result == Set[(Int, Set[Int])](
+    assert(result === Set[(Int, Set[Int])](
       (1, Set[Int](10, 100, 1000)),
       (2, Set[Int](20, 200)),
       (3, Set[Int](30))))
+    sc.stop()
   }
 
   test("ordering") {
-    val conf = createSparkConf(false)
+    val conf = createSparkConf(loadDefaults = false)
     sc = new SparkContext("local", "test", conf)
 
-    val map1 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
-      mergeValue, mergeCombiners)
+    val map1 = createExternalMap[Int]
     map1.insert(1, 10)
     map1.insert(2, 20)
     map1.insert(3, 30)
 
-    val map2 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
-      mergeValue, mergeCombiners)
+    val map2 = createExternalMap[Int]
     map2.insert(2, 20)
     map2.insert(3, 30)
     map2.insert(1, 10)
 
-    val map3 = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
-      mergeValue, mergeCombiners)
+    val map3 = createExternalMap[Int]
     map3.insert(3, 30)
     map3.insert(1, 10)
     map3.insert(2, 20)
@@ -119,33 +121,33 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     var kv1 = it1.next()
     var kv2 = it2.next()
     var kv3 = it3.next()
-    assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
-    assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+    assert(kv1._1 === kv2._1 && kv2._1 === kv3._1)
+    assert(kv1._2 === kv2._2 && kv2._2 === kv3._2)
 
     kv1 = it1.next()
     kv2 = it2.next()
     kv3 = it3.next()
-    assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
-    assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+    assert(kv1._1 === kv2._1 && kv2._1 === kv3._1)
+    assert(kv1._2 === kv2._2 && kv2._2 === kv3._2)
 
     kv1 = it1.next()
     kv2 = it2.next()
     kv3 = it3.next()
-    assert(kv1._1 == kv2._1 && kv2._1 == kv3._1)
-    assert(kv1._2 == kv2._2 && kv2._2 == kv3._2)
+    assert(kv1._1 === kv2._1 && kv2._1 === kv3._1)
+    assert(kv1._2 === kv2._2 && kv2._2 === kv3._2)
+    sc.stop()
   }
 
   test("null keys and values") {
-    val conf = createSparkConf(false)
+    val conf = createSparkConf(loadDefaults = false)
     sc = new SparkContext("local", "test", conf)
 
-    val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner,
-      mergeValue, mergeCombiners)
+    val map = createExternalMap[Int]
     map.insert(1, 5)
     map.insert(2, 6)
     map.insert(3, 7)
     assert(map.size === 3)
-    assert(map.iterator.toSet == Set[(Int, Seq[Int])](
+    assert(map.iterator.toSet === Set[(Int, Seq[Int])](
       (1, Seq[Int](5)),
       (2, Seq[Int](6)),
       (3, Seq[Int](7))
@@ -155,7 +157,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     val nullInt = null.asInstanceOf[Int]
     map.insert(nullInt, 8)
     assert(map.size === 4)
-    assert(map.iterator.toSet == Set[(Int, Seq[Int])](
+    assert(map.iterator.toSet === Set[(Int, Seq[Int])](
       (1, Seq[Int](5)),
       (2, Seq[Int](6)),
       (3, Seq[Int](7)),
@@ -167,32 +169,34 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     map.insert(nullInt, nullInt)
     assert(map.size === 5)
     val result = map.iterator.toSet[(Int, ArrayBuffer[Int])].map(kv => (kv._1, kv._2.toSet))
-    assert(result == Set[(Int, Set[Int])](
+    assert(result === Set[(Int, Set[Int])](
       (1, Set[Int](5)),
       (2, Set[Int](6)),
       (3, Set[Int](7)),
       (4, Set[Int](nullInt)),
       (nullInt, Set[Int](nullInt, 8))
     ))
+    sc.stop()
   }
 
   test("simple aggregator") {
-    val conf = createSparkConf(false)
+    val conf = createSparkConf(loadDefaults = false)
     sc = new SparkContext("local", "test", conf)
 
     // reduceByKey
     val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))
     val result1 = rdd.reduceByKey(_+_).collect()
-    assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5)))
+    assert(result1.toSet === Set[(Int, Int)]((0, 5), (1, 5)))
 
     // groupByKey
     val result2 = rdd.groupByKey().collect().map(x => (x._1, x._2.toList)).toSet
-    assert(result2.toSet == Set[(Int, Seq[Int])]
+    assert(result2.toSet === Set[(Int, Seq[Int])]
       ((0, List[Int](1, 1, 1, 1, 1)), (1, List[Int](1, 1, 1, 1, 1))))
+    sc.stop()
   }
 
   test("simple cogroup") {
-    val conf = createSparkConf(false)
+    val conf = createSparkConf(loadDefaults = false)
     sc = new SparkContext("local", "test", conf)
     val rdd1 = sc.parallelize(1 to 4).map(i => (i, i))
     val rdd2 = sc.parallelize(1 to 4).map(i => (i%2, i))
@@ -200,77 +204,98 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
 
     result.foreach { case (i, (seq1, seq2)) =>
       i match {
-        case 0 => assert(seq1.toSet == Set[Int]() && seq2.toSet == Set[Int](2, 4))
-        case 1 => assert(seq1.toSet == Set[Int](1) && seq2.toSet == Set[Int](1, 3))
-        case 2 => assert(seq1.toSet == Set[Int](2) && seq2.toSet == Set[Int]())
-        case 3 => assert(seq1.toSet == Set[Int](3) && seq2.toSet == Set[Int]())
-        case 4 => assert(seq1.toSet == Set[Int](4) && seq2.toSet == Set[Int]())
+        case 0 => assert(seq1.toSet === Set[Int]() && seq2.toSet === Set[Int](2, 4))
+        case 1 => assert(seq1.toSet === Set[Int](1) && seq2.toSet === Set[Int](1, 3))
+        case 2 => assert(seq1.toSet === Set[Int](2) && seq2.toSet === Set[Int]())
+        case 3 => assert(seq1.toSet === Set[Int](3) && seq2.toSet === Set[Int]())
+        case 4 => assert(seq1.toSet === Set[Int](4) && seq2.toSet === Set[Int]())
       }
     }
+    sc.stop()
   }
 
   test("spilling") {
-    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME is not found
+    testSimpleSpilling()
+  }
+
+  test("spilling with compression") {
+    // Keep track of which compression codec we're using to report in test failure messages
+    var lastCompressionCodec: Option[String] = None
+    try {
+      allCompressionCodecs.foreach { c =>
+        lastCompressionCodec = Some(c)
+        testSimpleSpilling(Some(c))
+      }
+    } catch {
+      // Include compression codec used in test failure message
+      // We need to catch Throwable here because assertion failures are not covered by Exceptions
+      case t: Throwable =>
+        val compressionMessage = lastCompressionCodec
+          .map { c => "with compression using codec " + c }
+          .getOrElse("without compression")
+        val newException = new Exception(s"Test failed $compressionMessage:\n\n${t.getMessage}")
+        newException.setStackTrace(t.getStackTrace)
+        throw newException
+    }
+  }
+
+  /**
+   * Test spilling through simple aggregations and cogroups.
+   * If a compression codec is provided, use it. Otherwise, do not compress spills.
+   */
+  private def testSimpleSpilling(codec: Option[String] = None): Unit = {
+    val conf = createSparkConf(loadDefaults = true, codec)  // Load defaults for Spark home
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
     // reduceByKey - should spill ~8 times
     val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
     val resultA = rddA.reduceByKey(math.max).collect()
-    assert(resultA.length == 50000)
-    resultA.foreach { case(k, v) =>
-      if (v != k * 2 + 1) {
-        fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
-      }
+    assert(resultA.length === 50000)
+    resultA.foreach { case (k, v) =>
+      assert(v === k * 2 + 1, s"Value for $k was wrong: expected ${k * 2 + 1}, got $v")
     }
 
     // groupByKey - should spill ~17 times
     val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
     val resultB = rddB.groupByKey().collect()
-    assert(resultB.length == 25000)
-    resultB.foreach { case(i, seq) =>
+    assert(resultB.length === 25000)
+    resultB.foreach { case (i, seq) =>
       val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
-      if (seq.toSet != expected) {
-        fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
-      }
+      assert(seq.toSet === expected,
+        s"Value for $i was wrong: expected $expected, got ${seq.toSet}")
     }
 
     // cogroup - should spill ~7 times
     val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
     val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
     val resultC = rddC1.cogroup(rddC2).collect()
-    assert(resultC.length == 10000)
-    resultC.foreach { case(i, (seq1, seq2)) =>
+    assert(resultC.length === 10000)
+    resultC.foreach { case (i, (seq1, seq2)) =>
       i match {
         case 0 =>
-          assert(seq1.toSet == Set[Int](0))
-          assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
+          assert(seq1.toSet === Set[Int](0))
+          assert(seq2.toSet === Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
         case 1 =>
-          assert(seq1.toSet == Set[Int](1))
-          assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
+          assert(seq1.toSet === Set[Int](1))
+          assert(seq2.toSet === Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
         case 5000 =>
-          assert(seq1.toSet == Set[Int](5000))
-          assert(seq2.toSet == Set[Int]())
+          assert(seq1.toSet === Set[Int](5000))
+          assert(seq2.toSet === Set[Int]())
         case 9999 =>
-          assert(seq1.toSet == Set[Int](9999))
-          assert(seq2.toSet == Set[Int]())
+          assert(seq1.toSet === Set[Int](9999))
+          assert(seq2.toSet === Set[Int]())
         case _ =>
       }
     }
+    sc.stop()
   }
 
   test("spilling with hash collisions") {
-    val conf = createSparkConf(true)
+    val conf = createSparkConf(loadDefaults = true)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
-
-    def createCombiner(i: String) = ArrayBuffer[String](i)
-    def mergeValue(buffer: ArrayBuffer[String], i: String) = buffer += i
-    def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String]) =
-      buffer1 ++= buffer2
-
-    val map = new ExternalAppendOnlyMap[String, String, ArrayBuffer[String]](
-      createCombiner, mergeValue, mergeCombiners)
+    val map = createExternalMap[String]
 
     val collisionPairs = Seq(
       ("Aa", "BB"),                   // 2112
@@ -312,13 +337,13 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       count += 1
     }
     assert(count === 100000 + collisionPairs.size * 2)
+    sc.stop()
   }
 
   test("spilling with many hash collisions") {
-    val conf = createSparkConf(true)
+    val conf = createSparkConf(loadDefaults = true)
     conf.set("spark.shuffle.memoryFraction", "0.0001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
-
     val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
 
     // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
@@ -337,15 +362,14 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       count += 1
     }
     assert(count === 10000)
+    sc.stop()
   }
 
   test("spilling with hash collisions using the Int.MaxValue key") {
-    val conf = createSparkConf(true)
+    val conf = createSparkConf(loadDefaults = true)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
-
-    val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](
-      createCombiner, mergeValue, mergeCombiners)
+    val map = createExternalMap[Int]
 
     (1 to 100000).foreach { i => map.insert(i, i) }
     map.insert(Int.MaxValue, Int.MaxValue)
@@ -355,15 +379,14 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       // Should not throw NoSuchElementException
       it.next()
     }
+    sc.stop()
   }
 
   test("spilling with null keys and values") {
-    val conf = createSparkConf(true)
+    val conf = createSparkConf(loadDefaults = true)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
-
-    val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](
-      createCombiner, mergeValue, mergeCombiners)
+    val map = createExternalMap[Int]
 
     map.insertAll((1 to 100000).iterator.map(i => (i, i)))
     map.insert(null.asInstanceOf[Int], 1)
@@ -375,6 +398,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       // Should not throw NullPointerException
       it.next()
     }
+    sc.stop()
   }
 
 }
-- 
GitLab