Skip to content
Snippets Groups Projects
Commit 4cc06f4e authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Marcelo Vanzin
Browse files

[SPARK-18986][CORE] ExternalAppendOnlyMap shouldn't fail when forced to spill...

[SPARK-18986][CORE] ExternalAppendOnlyMap shouldn't fail when forced to spill before calling its iterator

## What changes were proposed in this pull request?

`ExternalAppendOnlyMap.forceSpill` now uses an assert to check if an iterator is not null in the map. However, the assertion is only true after the map is asked for iterator. Before it, if another memory consumer asks more memory than currently available, `ExternalAppendOnlyMap.forceSpill` is also be called too. In this case, we will see failure like this:

    [info]   java.lang.AssertionError: assertion failed
    [info]   at scala.Predef$.assert(Predef.scala:156)
    [info]   at org.apache.spark.util.collection.ExternalAppendOnlyMap.forceSpill(ExternalAppendOnlyMap.scala:196)
    [info]   at org.apache.spark.util.collection.Spillable.spill(Spillable.scala:111)
    [info]   at org.apache.spark.util.collection.ExternalAppendOnlyMapSuite$$anonfun$13.apply$mcV$sp(ExternalAppendOnlyMapSuite.scala:294)

This fixing is motivated by http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-AssertionError-assertion-failed-tc20277.html.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16387 from viirya/fix-externalappendonlymap.
parent 3d0c3af0
No related branches found
No related tags found
No related merge requests found
...@@ -192,12 +192,19 @@ class ExternalAppendOnlyMap[K, V, C]( ...@@ -192,12 +192,19 @@ class ExternalAppendOnlyMap[K, V, C](
* It will be called by TaskMemoryManager when there is not enough memory for the task. * It will be called by TaskMemoryManager when there is not enough memory for the task.
*/ */
override protected[this] def forceSpill(): Boolean = { override protected[this] def forceSpill(): Boolean = {
assert(readingIterator != null) if (readingIterator != null) {
val isSpilled = readingIterator.spill() val isSpilled = readingIterator.spill()
if (isSpilled) { if (isSpilled) {
currentMap = null currentMap = null
}
isSpilled
} else if (currentMap.size > 0) {
spill(currentMap)
currentMap = new SizeTrackingAppendOnlyMap[K, C]
true
} else {
false
} }
isSpilled
} }
/** /**
......
...@@ -283,6 +283,17 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { ...@@ -283,6 +283,17 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
sc.stop() sc.stop()
} }
test("ExternalAppendOnlyMap shouldn't fail when forced to spill before calling its iterator") {
val size = 1000
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[String]
val consumer = createExternalMap[String]
map.insertAll((1 to size).iterator.map(_.toString).map(i => (i, i)))
assert(map.spill(10000, consumer) == 0L)
}
test("spilling with hash collisions") { test("spilling with hash collisions") {
val size = 1000 val size = 1000
val conf = createSparkConf(loadDefaults = true) val conf = createSparkConf(loadDefaults = true)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment