Skip to content
Snippets Groups Projects
Commit 718cc803 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #200 from mateiz/hash-fix

AppendOnlyMap fixes

- Chose a more random reshuffling step for values returned by Object.hashCode to avoid some long chaining that was happening for consecutive integers (e.g. `sc.makeRDD(1 to 100000000, 100).map(t => (t, t)).reduceByKey(_ + _).count`)
- Some other small optimizations throughout (see commit comments)
parents 51aa9d6e 9837a602
No related branches found
No related tags found
No related merge requests found
......@@ -35,6 +35,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
private var capacity = nextPowerOf2(initialCapacity)
private var mask = capacity - 1
private var curSize = 0
private var growThreshold = LOAD_FACTOR * capacity
// Holds keys and values in the same array for memory locality; specifically, the order of
// elements is key0, value0, key1, value1, key2, value2, etc.
......@@ -56,7 +57,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
var i = 1
while (true) {
val curKey = data(2 * pos)
if (k.eq(curKey) || k == curKey) {
if (k.eq(curKey) || k.equals(curKey)) {
return data(2 * pos + 1).asInstanceOf[V]
} else if (curKey.eq(null)) {
return null.asInstanceOf[V]
......@@ -80,9 +81,23 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
haveNullValue = true
return
}
val isNewEntry = putInto(data, k, value.asInstanceOf[AnyRef])
if (isNewEntry) {
incrementSize()
var pos = rehash(key.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {
data(2 * pos) = k
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
incrementSize() // Since we added a new key
return
} else if (k.eq(curKey) || k.equals(curKey)) {
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
return
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
}
......@@ -104,7 +119,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
var i = 1
while (true) {
val curKey = data(2 * pos)
if (k.eq(curKey) || k == curKey) {
if (k.eq(curKey) || k.equals(curKey)) {
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
......@@ -161,45 +176,17 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
/** Increase table size by 1, rehashing if necessary */
private def incrementSize() {
curSize += 1
if (curSize > LOAD_FACTOR * capacity) {
if (curSize > growThreshold) {
growTable()
}
}
/**
* Re-hash a value to deal better with hash functions that don't differ
* in the lower bits, similar to java.util.HashMap
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
* We use the Murmur Hash 3 finalization step that's also used in fastutil.
*/
private def rehash(h: Int): Int = {
val r = h ^ (h >>> 20) ^ (h >>> 12)
r ^ (r >>> 7) ^ (r >>> 4)
}
/**
* Put an entry into a table represented by data, returning true if
* this increases the size of the table or false otherwise. Assumes
* that "data" has at least one empty slot.
*/
private def putInto(data: Array[AnyRef], key: AnyRef, value: AnyRef): Boolean = {
val mask = (data.length / 2) - 1
var pos = rehash(key.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {
data(2 * pos) = key
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
return true
} else if (curKey.eq(key) || curKey == key) {
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
return false
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
return false // Never reached but needed to keep compiler happy
it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
}
/** Double the table's size and re-hash everything */
......@@ -211,16 +198,36 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
throw new Exception("Can't make capacity bigger than 2^29 elements")
}
val newData = new Array[AnyRef](2 * newCapacity)
var pos = 0
while (pos < capacity) {
if (!data(2 * pos).eq(null)) {
putInto(newData, data(2 * pos), data(2 * pos + 1))
val newMask = newCapacity - 1
// Insert all our old values into the new array. Note that because our old keys are
// unique, there's no need to check for equality here when we insert.
var oldPos = 0
while (oldPos < capacity) {
if (!data(2 * oldPos).eq(null)) {
val key = data(2 * oldPos)
val value = data(2 * oldPos + 1)
var newPos = rehash(key.hashCode) & newMask
var i = 1
var keepGoing = true
while (keepGoing) {
val curKey = newData(2 * newPos)
if (curKey.eq(null)) {
newData(2 * newPos) = key
newData(2 * newPos + 1) = value
keepGoing = false
} else {
val delta = i
newPos = (newPos + delta) & newMask
i += 1
}
}
}
pos += 1
oldPos += 1
}
data = newData
capacity = newCapacity
mask = newCapacity - 1
mask = newMask
growThreshold = LOAD_FACTOR * newCapacity
}
private def nextPowerOf2(n: Int): Int = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment