Skip to content
Snippets Groups Projects
Commit 85d3596e authored by Aaron Davidson's avatar Aaron Davidson Committed by Matei Zaharia
Browse files

SPARK-2047: Introduce an in-mem Sorter, and use it to reduce mem usage

### Why and what?
Currently, the AppendOnlyMap performs an "in-place" sort by converting its array of [key, value, key, value] pairs into a an array of [(key, value), (key, value)] pairs. However, this causes us to allocate many Tuple2 objects, which come at a nontrivial overhead.

This patch adds a Sorter API, intended for in memory sorts, which simply ports the Android Timsort implementation (available under Apache v2) and abstracts the interface in a way which introduces no more than 1 virtual function invocation of overhead at each abstraction point.

Please compare our port of the Android Timsort sort with the original implementation: http://www.diffchecker.com/wiwrykcl

### Memory implications
An AppendOnlyMap contains N kv pairs, which results in roughly 2N elements within its underlying array. Each of these elements is 4 bytes wide in a [compressed OOPS](https://wikis.oracle.com/display/HotSpotInternals/CompressedOops) system, which is the default.

Today's approach immediately allocates N Tuple2 objects, which take up 24N bytes in total (exposed via YourKit), and undergoes a Java sort. The Java 6 version immediately copies the entire array (4N bytes here), while the Java 7 version has a worst-case allocation of half the array (2N bytes).
This results in a worst-case sorting overhead of 24N + 2N = 26N bytes (for Java 7).

The Sorter does not require allocating any tuples, but since it uses Timsort, it may copy up to half the entire array in the worst case.
This results in a worst-case sorting overhead of 4N bytes.

Thus, we have reduced the worst-case overhead of the sort by roughly 22 bytes times the number of elements.

### Performance implications
As the destructiveSortedIterator is used for spilling in an ExternalAppendOnlyMap, the purpose of this patch is to provide stability by reducing memory usage rather than improve performance. However, because it implements Timsort, it also brings a substantial performance boost over our prior implementation.

Here are the results of a microbenchmark that sorted 25 million, randomly distributed (Float, Int) pairs. The Java Arrays.sort() tests were run **only on the keys**, and thus moved less data. Our current implementation is called "Tuple-sort using Arrays.sort()" while the new implementation is "KV-array using Sorter".

<table>
<tr><th>Test</th><th>First run (JDK6)</th><th>Average of 10 (JDK6)</th><th>First run (JDK7)</th><th>Average of 10 (JDK7)</th></tr>
<tr><td>primitive Arrays.sort()</td><td>3216 ms</td><td>1190 ms</td><td>2724 ms</td><td>131 ms (!!)</td></tr>
<tr><td>Arrays.sort()</td><td>18564 ms</td><td>2006 ms</td><td>13201 ms</td><td>878 ms</td></tr>
<tr><td>Tuple-sort using Arrays.sort()</td><td>31813 ms</td><td>3550 ms</td><td>20990 ms</td><td>1919 ms</td></tr>
<tr><td><b>KV-array using Sorter</b></td><td></td><td></td><td><b>15020 ms</b></td><td><b>834 ms</b></td></tr>
</table>

The results show that this Sorter performs exactly as expected (after the first run) -- it is as fast as the Java 7 Arrays.sort() (which shares the same algorithm), but is significantly faster than the Tuple-sort on Java 6 or 7.

In short, this patch should significantly improve performance for users running either Java 6 or 7.

Author: Aaron Davidson <aaron@databricks.com>

Closes #1502 from aarondav/sort and squashes the following commits:

652d936 [Aaron Davidson] Update license, move Sorter to java src
a7b5b1c [Aaron Davidson] fix licenses
5c0efaf [Aaron Davidson] Update tmpLength
ec395c8 [Aaron Davidson] Ignore benchmark (again) and fix docs
034bf10 [Aaron Davidson] Change to Apache v2 Timsort
b97296c [Aaron Davidson] Don't try to run benchmark on Jenkins + private[spark]
6307338 [Aaron Davidson] SPARK-2047: Introduce an in-mem Sorter, and use it to reduce mem usage
parent 14078717
No related branches found
No related tags found
No related merge requests found
......@@ -442,7 +442,7 @@ Written by Pavel Binko, Dino Ferrero Merlino, Wolfgang Hoschek, Tony Johnson, An
========================================================================
Fo SnapTree:
For SnapTree:
========================================================================
SNAPTREE LICENSE
......@@ -482,6 +482,24 @@ OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
========================================================================
For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
========================================================================
Copyright (C) 2008 The Android Open Source Project
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
========================================================================
BSD-style licenses
========================================================================
......
This diff is collapsed.
......@@ -254,26 +254,21 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
* Return an iterator of the map in sorted order. This provides a way to sort the map without
* using additional memory, at the expense of destroying the validity of the map.
*/
def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
destroyed = true
// Pack KV pairs into the front of the underlying array
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
data(2 * newIndex) = data(2 * keyIndex)
data(2 * newIndex + 1) = data(2 * keyIndex + 1)
newIndex += 1
}
keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
// Sort by the given ordering
val rawOrdering = new Comparator[AnyRef] {
def compare(x: AnyRef, y: AnyRef): Int = {
cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
}
}
Arrays.sort(data, 0, newIndex, rawOrdering)
new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
new Iterator[(K, V)] {
var i = 0
......@@ -284,7 +279,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
nullValueReady = false
(null.asInstanceOf[K], nullValue)
} else {
val item = data(i).asInstanceOf[(K, V)]
val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
i += 1
item
}
......
......@@ -30,6 +30,7 @@ import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BlockManager}
import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
/**
* :: DeveloperApi ::
......@@ -66,8 +67,6 @@ class ExternalAppendOnlyMap[K, V, C](
blockManager: BlockManager = SparkEnv.get.blockManager)
extends Iterable[(K, C)] with Serializable with Logging {
import ExternalAppendOnlyMap._
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = SparkEnv.get.conf
......@@ -105,7 +104,7 @@ class ExternalAppendOnlyMap[K, V, C](
private var _diskBytesSpilled = 0L
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val comparator = new KCComparator[K, C]
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()
/**
......@@ -173,7 +172,7 @@ class ExternalAppendOnlyMap[K, V, C](
}
try {
val it = currentMap.destructiveSortedIterator(comparator)
val it = currentMap.destructiveSortedIterator(keyComparator)
while (it.hasNext) {
val kv = it.next()
writer.write(kv)
......@@ -231,7 +230,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = currentMap.destructiveSortedIterator(comparator)
private val sortedMap = currentMap.destructiveSortedIterator(keyComparator)
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
inputStreams.foreach { it =>
......@@ -252,7 +251,7 @@ class ExternalAppendOnlyMap[K, V, C](
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
val minHash = getKeyHashCode(kc)
val minHash = hashKey(kc)
while (it.hasNext && it.head._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
......@@ -298,7 +297,7 @@ class ExternalAppendOnlyMap[K, V, C](
val minPair = minPairs.remove(0)
val minKey = minPair._1
var minCombiner = minPair._2
assert(getKeyHashCode(minPair) == minHash)
assert(hashKey(minPair) == minHash)
// For all other streams that may have this key (i.e. have the same minimum key hash),
// merge in the corresponding value (if any) from that stream
......@@ -339,7 +338,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Invalid if there are no more pairs in this stream
def minKeyHash: Int = {
assert(pairs.length > 0)
getKeyHashCode(pairs.head)
hashKey(pairs.head)
}
override def compareTo(other: StreamBuffer): Int = {
......@@ -423,25 +422,27 @@ class ExternalAppendOnlyMap[K, V, C](
file.delete()
}
}
/** Convenience function to hash the given (K, C) pair by the key. */
private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1)
}
private[spark] object ExternalAppendOnlyMap {
/**
* Return the key hash code of the given (key, combiner) pair.
* If the key is null, return a special hash code.
* Return the hash code of the given object. If the object is null, return a special hash code.
*/
private def getKeyHashCode[K, C](kc: (K, C)): Int = {
if (kc._1 == null) 0 else kc._1.hashCode()
private def hash[T](obj: T): Int = {
if (obj == null) 0 else obj.hashCode()
}
/**
* A comparator for (key, combiner) pairs based on their key hash codes.
* A comparator which sorts arbitrary keys based on their hash codes.
*/
private class KCComparator[K, C] extends Comparator[(K, C)] {
def compare(kc1: (K, C), kc2: (K, C)): Int = {
val hash1 = getKeyHashCode(kc1)
val hash2 = getKeyHashCode(kc2)
private class HashComparator[K] extends Comparator[K] {
def compare(key1: K, key2: K): Int = {
val hash1 = hash(key1)
val hash2 = hash(key2)
if (hash1 < hash2) -1 else if (hash1 == hash2) 0 else 1
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util.collection
import scala.reflect.ClassTag
/**
* Abstraction for sorting an arbitrary input buffer of data. This interface requires determining
* the sort key for a given element index, as well as swapping elements and moving data from one
* buffer to another.
*
* Example format: an array of numbers, where each element is also the key.
* See [[KVArraySortDataFormat]] for a more exciting format.
*
* This trait extends Any to ensure it is universal (and thus compiled to a Java interface).
*
* @tparam K Type of the sort key of each element
* @tparam Buffer Internal data structure used by a particular format (e.g., Array[Int]).
*/
// TODO: Making Buffer a real trait would be a better abstraction, but adds some complexity.
private[spark] trait SortDataFormat[K, Buffer] extends Any {
/** Return the sort key for the element at the given index. */
protected def getKey(data: Buffer, pos: Int): K
/** Swap two elements. */
protected def swap(data: Buffer, pos0: Int, pos1: Int): Unit
/** Copy a single element from src(srcPos) to dst(dstPos). */
protected def copyElement(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int): Unit
/**
* Copy a range of elements starting at src(srcPos) to dst, starting at dstPos.
* Overlapping ranges are allowed.
*/
protected def copyRange(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int, length: Int): Unit
/**
* Allocates a Buffer that can hold up to 'length' elements.
* All elements of the buffer should be considered invalid until data is explicitly copied in.
*/
protected def allocate(length: Int): Buffer
}
/**
* Supports sorting an array of key-value pairs where the elements of the array alternate between
* keys and values, as used in [[AppendOnlyMap]].
*
* @tparam K Type of the sort key of each element
* @tparam T Type of the Array we're sorting. Typically this must extend AnyRef, to support cases
* when the keys and values are not the same type.
*/
private[spark]
class KVArraySortDataFormat[K, T <: AnyRef : ClassTag] extends SortDataFormat[K, Array[T]] {
override protected def getKey(data: Array[T], pos: Int): K = data(2 * pos).asInstanceOf[K]
override protected def swap(data: Array[T], pos0: Int, pos1: Int) {
val tmpKey = data(2 * pos0)
val tmpVal = data(2 * pos0 + 1)
data(2 * pos0) = data(2 * pos1)
data(2 * pos0 + 1) = data(2 * pos1 + 1)
data(2 * pos1) = tmpKey
data(2 * pos1 + 1) = tmpVal
}
override protected def copyElement(src: Array[T], srcPos: Int, dst: Array[T], dstPos: Int) {
dst(2 * dstPos) = src(2 * srcPos)
dst(2 * dstPos + 1) = src(2 * srcPos + 1)
}
override protected def copyRange(src: Array[T], srcPos: Int,
dst: Array[T], dstPos: Int, length: Int) {
System.arraycopy(src, 2 * srcPos, dst, 2 * dstPos, 2 * length)
}
override protected def allocate(length: Int): Array[T] = {
new Array[T](2 * length)
}
}
......@@ -170,10 +170,10 @@ class AppendOnlyMapSuite extends FunSuite {
case e: IllegalStateException => fail()
}
val it = map.destructiveSortedIterator(new Comparator[(String, String)] {
def compare(kv1: (String, String), kv2: (String, String)): Int = {
val x = if (kv1 != null && kv1._1 != null) kv1._1.toInt else Int.MinValue
val y = if (kv2 != null && kv2._1 != null) kv2._1.toInt else Int.MinValue
val it = map.destructiveSortedIterator(new Comparator[String] {
def compare(key1: String, key2: String): Int = {
val x = if (key1 != null) key1.toInt else Int.MinValue
val y = if (key2 != null) key2.toInt else Int.MinValue
x.compareTo(y)
}
})
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util.collection
import java.lang.{Float => JFloat}
import java.util.{Arrays, Comparator}
import org.scalatest.FunSuite
import org.apache.spark.util.random.XORShiftRandom
class SorterSuite extends FunSuite {
test("equivalent to Arrays.sort") {
val rand = new XORShiftRandom(123)
val data0 = Array.tabulate[Int](10000) { i => rand.nextInt() }
val data1 = data0.clone()
Arrays.sort(data0)
new Sorter(new IntArraySortDataFormat).sort(data1, 0, data1.length, Ordering.Int)
data0.zip(data1).foreach { case (x, y) => assert(x === y) }
}
test("KVArraySorter") {
val rand = new XORShiftRandom(456)
// Construct an array of keys (to Java sort) and an array where the keys and values
// alternate. Keys are random doubles, values are ordinals from 0 to length.
val keys = Array.tabulate[Double](5000) { i => rand.nextDouble() }
val keyValueArray = Array.tabulate[Number](10000) { i =>
if (i % 2 == 0) keys(i / 2) else new Integer(i / 2)
}
// Map from generated keys to values, to verify correctness later
val kvMap =
keyValueArray.grouped(2).map { case Array(k, v) => k.doubleValue() -> v.intValue() }.toMap
Arrays.sort(keys)
new Sorter(new KVArraySortDataFormat[Double, Number])
.sort(keyValueArray, 0, keys.length, Ordering.Double)
keys.zipWithIndex.foreach { case (k, i) =>
assert(k === keyValueArray(2 * i))
assert(kvMap(k) === keyValueArray(2 * i + 1))
}
}
/**
* This provides a simple benchmark for comparing the Sorter with Java internal sorting.
* Ideally these would be executed one at a time, each in their own JVM, so their listing
* here is mainly to have the code.
*
* The goal of this code is to sort an array of key-value pairs, where the array physically
* has the keys and values alternating. The basic Java sorts work only on the keys, so the
* real Java solution is to make Tuple2s to store the keys and values and sort an array of
* those, while the Sorter approach can work directly on the input data format.
*
* Note that the Java implementation varies tremendously between Java 6 and Java 7, when
* the Java sort changed from merge sort to Timsort.
*/
ignore("Sorter benchmark") {
/** Runs an experiment several times. */
def runExperiment(name: String)(f: => Unit): Unit = {
val firstTry = org.apache.spark.util.Utils.timeIt(1)(f)
System.gc()
var i = 0
var next10: Long = 0
while (i < 10) {
val time = org.apache.spark.util.Utils.timeIt(1)(f)
next10 += time
println(s"$name: Took $time ms")
i += 1
}
println(s"$name: ($firstTry ms first try, ${next10 / 10} ms average)")
}
val numElements = 25000000 // 25 mil
val rand = new XORShiftRandom(123)
val keys = Array.tabulate[JFloat](numElements) { i =>
new JFloat(rand.nextFloat())
}
// Test our key-value pairs where each element is a Tuple2[Float, Integer)
val kvTupleArray = Array.tabulate[AnyRef](numElements) { i =>
(keys(i / 2): Float, i / 2: Int)
}
runExperiment("Tuple-sort using Arrays.sort()") {
Arrays.sort(kvTupleArray, new Comparator[AnyRef] {
override def compare(x: AnyRef, y: AnyRef): Int =
Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, y.asInstanceOf[(Float, _)]._1)
})
}
// Test our Sorter where each element alternates between Float and Integer, non-primitive
val keyValueArray = Array.tabulate[AnyRef](numElements * 2) { i =>
if (i % 2 == 0) keys(i / 2) else new Integer(i / 2)
}
val sorter = new Sorter(new KVArraySortDataFormat[JFloat, AnyRef])
runExperiment("KV-sort using Sorter") {
sorter.sort(keyValueArray, 0, keys.length, new Comparator[JFloat] {
override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y)
})
}
// Test non-primitive sort on float array
runExperiment("Java Arrays.sort()") {
Arrays.sort(keys, new Comparator[JFloat] {
override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y)
})
}
// Test primitive sort on float array
val primitiveKeys = Array.tabulate[Float](numElements) { i => rand.nextFloat() }
runExperiment("Java Arrays.sort() on primitive keys") {
Arrays.sort(primitiveKeys)
}
}
}
/** Format to sort a simple Array[Int]. Could be easily generified and specialized. */
class IntArraySortDataFormat extends SortDataFormat[Int, Array[Int]] {
override protected def getKey(data: Array[Int], pos: Int): Int = {
data(pos)
}
override protected def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = {
val tmp = data(pos0)
data(pos0) = data(pos1)
data(pos1) = tmp
}
override protected def copyElement(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int) {
dst(dstPos) = src(srcPos)
}
/** Copy a range of elements starting at src(srcPos) to dest, starting at destPos. */
override protected def copyRange(src: Array[Int], srcPos: Int,
dst: Array[Int], dstPos: Int, length: Int) {
System.arraycopy(src, srcPos, dst, dstPos, length)
}
/** Allocates a new structure that can hold up to 'length' elements. */
override protected def allocate(length: Int): Array[Int] = {
new Array[Int](length)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment