mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-19 13:20:37 +00:00
Compare commits
3 Commits
erik/commu
...
quantumish
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cee8c10582 | ||
|
|
362fa1af8f | ||
|
|
6a76bc63f9 |
@@ -23,7 +23,3 @@ tempfile = "3.14.0"
|
||||
[[bench]]
|
||||
name = "hmap_resize"
|
||||
harness = false
|
||||
|
||||
[[bin]]
|
||||
name = "hmap_test"
|
||||
path = "main.rs"
|
||||
|
||||
@@ -21,6 +21,8 @@ pub mod entry;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
mod optim;
|
||||
|
||||
use core::{CoreHashMap, INVALID_POS};
|
||||
use entry::{Entry, OccupiedEntry};
|
||||
|
||||
@@ -54,18 +56,21 @@ impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> {
|
||||
}
|
||||
|
||||
pub fn attach_writer(self) -> HashMapAccess<'a, K, V, S> {
|
||||
// carve out the HashMapShared struct from the area.
|
||||
let mut ptr: *mut u8 = self.shared_ptr.cast();
|
||||
let mut ptr: *mut u8 = self.shared_ptr.cast();
|
||||
let end_ptr: *mut u8 = unsafe { ptr.add(self.shared_size) };
|
||||
ptr = unsafe { ptr.add(ptr.align_offset(align_of::<HashMapShared<K, V>>())) };
|
||||
let shared_ptr: *mut HashMapShared<K, V> = ptr.cast();
|
||||
ptr = unsafe { ptr.add(size_of::<HashMapShared<K, V>>()) };
|
||||
|
||||
|
||||
// carve out the buckets
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<core::Bucket<K, V>>())) };
|
||||
let buckets_ptr = ptr;
|
||||
ptr = unsafe { ptr.add(size_of::<core::Bucket<K, V>>() * self.num_buckets as usize) };
|
||||
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<core::LinkedKey<K>>())) };
|
||||
let keys_ptr = ptr;
|
||||
ptr = unsafe { ptr.add(size_of::<core::LinkedKey<K>>() * self.num_buckets as usize) };
|
||||
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<Option<V>>())) };
|
||||
let vals_ptr = ptr;
|
||||
ptr = unsafe { ptr.add(size_of::<Option<V>>() * self.num_buckets as usize) };
|
||||
|
||||
// use remaining space for the dictionary
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<u32>())) };
|
||||
assert!(ptr.addr() < end_ptr.addr());
|
||||
@@ -73,12 +78,14 @@ impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> {
|
||||
let dictionary_size = unsafe { end_ptr.byte_offset_from(ptr) / size_of::<u32>() as isize };
|
||||
assert!(dictionary_size > 0);
|
||||
|
||||
let buckets =
|
||||
unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), self.num_buckets as usize) };
|
||||
let keys =
|
||||
unsafe { std::slice::from_raw_parts_mut(keys_ptr.cast(), self.num_buckets as usize) };
|
||||
let vals =
|
||||
unsafe { std::slice::from_raw_parts_mut(vals_ptr.cast(), self.num_buckets as usize) };
|
||||
let dictionary = unsafe {
|
||||
std::slice::from_raw_parts_mut(dictionary_ptr.cast(), dictionary_size as usize)
|
||||
};
|
||||
let hashmap = CoreHashMap::new(buckets, dictionary);
|
||||
let hashmap = CoreHashMap::new(keys, vals, dictionary);
|
||||
unsafe {
|
||||
std::ptr::write(shared_ptr, HashMapShared { inner: hashmap });
|
||||
}
|
||||
@@ -92,7 +99,7 @@ impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> {
|
||||
|
||||
pub fn attach_reader(self) -> HashMapAccess<'a, K, V, S> {
|
||||
// no difference to attach_writer currently
|
||||
self.attach_writer()
|
||||
self.attach_writer()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,22 +219,22 @@ where
|
||||
/// iterate through the hash map. (An Iterator might be nicer. The communicator's
|
||||
/// clock algorithm needs to _slowly_ iterate through all buckets with its clock hand,
|
||||
/// without holding a lock. If we switch to an Iterator, it must not hold the lock.)
|
||||
pub fn get_at_bucket(&self, pos: usize) -> Option<&(K, V)> {
|
||||
pub fn get_at_bucket(&self, pos: usize) -> Option<(&K, &V)> {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
|
||||
|
||||
if pos >= map.inner.buckets.len() {
|
||||
if pos >= map.inner.keys.len() {
|
||||
return None;
|
||||
}
|
||||
let bucket = &map.inner.buckets[pos];
|
||||
bucket.inner.as_ref()
|
||||
let key = &map.inner.keys[pos];
|
||||
key.inner.as_ref().map(|k| (k, map.inner.vals[pos].as_ref().unwrap()))
|
||||
}
|
||||
|
||||
pub fn get_bucket_for_value(&self, val_ptr: *const V) -> usize {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
|
||||
|
||||
let origin = map.inner.buckets.as_ptr();
|
||||
let origin = map.inner.vals.as_ptr();
|
||||
let idx = (val_ptr as usize - origin as usize) / (size_of::<V>() as usize);
|
||||
assert!(idx < map.inner.buckets.len());
|
||||
assert!(idx < map.inner.vals.len());
|
||||
|
||||
idx
|
||||
}
|
||||
@@ -249,7 +256,7 @@ where
|
||||
fn rehash_dict(
|
||||
&mut self,
|
||||
inner: &mut CoreHashMap<'a, K, V>,
|
||||
buckets_ptr: *mut core::Bucket<K, V>,
|
||||
keys_ptr: *mut core::LinkedKey<K>,
|
||||
end_ptr: *mut u8,
|
||||
num_buckets: u32,
|
||||
rehash_buckets: u32,
|
||||
@@ -257,17 +264,19 @@ where
|
||||
inner.free_head = INVALID_POS;
|
||||
|
||||
// Recalculate the dictionary
|
||||
let buckets;
|
||||
let keys;
|
||||
let dictionary;
|
||||
unsafe {
|
||||
let buckets_end_ptr = buckets_ptr.add(num_buckets as usize);
|
||||
let dictionary_ptr: *mut u32 = buckets_end_ptr
|
||||
.byte_add(buckets_end_ptr.align_offset(align_of::<u32>()))
|
||||
let keys_end_ptr = keys_ptr.add(num_buckets as usize);
|
||||
let buckets_end_ptr: *mut u8 = (keys_end_ptr as *mut u8)
|
||||
.add(size_of::<Option<V>>() * num_buckets as usize);
|
||||
let dictionary_ptr: *mut u32 = buckets_end_ptr
|
||||
.byte_add(buckets_end_ptr.align_offset(align_of::<u32>()))
|
||||
.cast();
|
||||
let dictionary_size: usize =
|
||||
end_ptr.byte_offset_from(buckets_end_ptr) as usize / size_of::<u32>();
|
||||
|
||||
buckets = std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize);
|
||||
keys = std::slice::from_raw_parts_mut(keys_ptr, num_buckets as usize);
|
||||
dictionary = std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size);
|
||||
}
|
||||
for i in 0..dictionary.len() {
|
||||
@@ -275,21 +284,21 @@ where
|
||||
}
|
||||
|
||||
for i in 0..rehash_buckets as usize {
|
||||
if buckets[i].inner.is_none() {
|
||||
buckets[i].next = inner.free_head;
|
||||
inner.free_head = i as u32;
|
||||
if keys[i].inner.is_none() {
|
||||
keys[i].next = inner.free_head;
|
||||
inner.free_head = i as u32;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let hash = self.hasher.hash_one(&buckets[i].inner.as_ref().unwrap().0);
|
||||
let hash = self.hasher.hash_one(&keys[i].inner.as_ref().unwrap());
|
||||
let pos: usize = (hash % dictionary.len() as u64) as usize;
|
||||
buckets[i].next = dictionary[pos];
|
||||
keys[i].next = dictionary[pos];
|
||||
dictionary[pos] = i as u32;
|
||||
}
|
||||
|
||||
// Finally, update the CoreHashMap struct
|
||||
inner.dictionary = dictionary;
|
||||
inner.buckets = buckets;
|
||||
inner.keys = keys;
|
||||
}
|
||||
|
||||
/// Rehash the map. Intended for benchmarking only.
|
||||
@@ -299,136 +308,131 @@ where
|
||||
let num_buckets = inner.get_num_buckets() as u32;
|
||||
let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
let end_ptr: *mut u8 = unsafe { (self.shared_ptr as *mut u8).add(size_bytes) };
|
||||
let buckets_ptr = inner.buckets.as_mut_ptr();
|
||||
self.rehash_dict(inner, buckets_ptr, end_ptr, num_buckets, num_buckets);
|
||||
let keys_ptr = inner.keys.as_mut_ptr();
|
||||
self.rehash_dict(inner, keys_ptr, end_ptr, num_buckets, num_buckets);
|
||||
}
|
||||
|
||||
/// Grow
|
||||
///
|
||||
/// 1. grow the underlying shared memory area
|
||||
/// 2. Initialize new buckets. This overwrites the current dictionary
|
||||
/// 3. Recalculate the dictionary
|
||||
pub fn grow(&mut self, num_buckets: u32) -> Result<(), crate::shmem::Error> {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
let inner = &mut map.inner;
|
||||
let old_num_buckets = inner.buckets.len() as u32;
|
||||
|
||||
if num_buckets < old_num_buckets {
|
||||
panic!("grow called with a smaller number of buckets");
|
||||
}
|
||||
if num_buckets == old_num_buckets {
|
||||
return Ok(());
|
||||
}
|
||||
let shmem_handle = self
|
||||
.shmem_handle
|
||||
.as_ref()
|
||||
.expect("grow called on a fixed-size hash table");
|
||||
|
||||
let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
shmem_handle.set_size(size_bytes)?;
|
||||
let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
|
||||
// Initialize new buckets. The new buckets are linked to the free list. NB: This overwrites
|
||||
// the dictionary!
|
||||
let buckets_ptr = inner.buckets.as_mut_ptr();
|
||||
unsafe {
|
||||
for i in old_num_buckets..num_buckets {
|
||||
let bucket_ptr = buckets_ptr.add(i as usize);
|
||||
bucket_ptr.write(core::Bucket {
|
||||
next: if i < num_buckets-1 {
|
||||
i as u32 + 1
|
||||
} else {
|
||||
inner.free_head
|
||||
},
|
||||
inner: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
self.rehash_dict(inner, buckets_ptr, end_ptr, num_buckets, old_num_buckets);
|
||||
inner.free_head = old_num_buckets;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Begin a shrink, limiting all new allocations to be in buckets with index below `num_buckets`.
|
||||
pub fn begin_shrink(&mut self, num_buckets: u32) {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
if num_buckets > map.inner.get_num_buckets() as u32 {
|
||||
panic!("shrink called with a larger number of buckets");
|
||||
}
|
||||
_ = self
|
||||
.shmem_handle
|
||||
.as_ref()
|
||||
.expect("shrink called on a fixed-size hash table");
|
||||
map.inner.alloc_limit = num_buckets;
|
||||
}
|
||||
|
||||
/// Returns whether a shrink operation is currently in progress.
|
||||
pub fn is_shrinking(&self) -> bool {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
map.inner.is_shrinking()
|
||||
}
|
||||
|
||||
/// Returns how many entries need to be evicted before shrink can complete.
|
||||
pub fn shrink_remaining(&self) -> usize {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
let inner = &mut map.inner;
|
||||
if !inner.is_shrinking() {
|
||||
panic!("shrink_remaining called when no ongoing shrink")
|
||||
} else {
|
||||
inner.buckets_in_use
|
||||
.checked_sub(inner.alloc_limit)
|
||||
.unwrap_or(0)
|
||||
as usize
|
||||
}
|
||||
}
|
||||
|
||||
/// Complete a shrink after caller has evicted entries, removing the unused buckets and rehashing.
|
||||
pub fn finish_shrink(&mut self) -> Result<(), crate::shmem::Error> {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
let inner = &mut map.inner;
|
||||
if !inner.is_shrinking() {
|
||||
panic!("called finish_shrink when no shrink is in progress");
|
||||
}
|
||||
// /// Grow
|
||||
// ///
|
||||
// /// 1. grow the underlying shared memory area
|
||||
// /// 2. Initialize new buckets. This overwrites the current dictionary
|
||||
// /// 3. Recalculate the dictionary
|
||||
// pub fn grow(&mut self, num_buckets: u32) -> Result<(), crate::shmem::Error> {
|
||||
// let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
// let inner = &mut map.inner;
|
||||
// let old_num_buckets = inner.buckets.len() as u32;
|
||||
// if num_buckets < old_num_buckets {
|
||||
// panic!("grow called with a smaller number of buckets");
|
||||
// }
|
||||
// if num_buckets == old_num_buckets {
|
||||
// return Ok(());
|
||||
// }
|
||||
// let shmem_handle = self
|
||||
// .shmem_handle
|
||||
// .as_ref()
|
||||
// .expect("grow called on a fixed-size hash table");
|
||||
|
||||
let num_buckets = inner.alloc_limit;
|
||||
// let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
// shmem_handle.set_size(size_bytes)?;
|
||||
// let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
|
||||
if inner.get_num_buckets() == num_buckets as usize {
|
||||
return Ok(());
|
||||
} else if inner.buckets_in_use > num_buckets {
|
||||
panic!("called finish_shrink before enough entries were removed");
|
||||
}
|
||||
// // Initialize new buckets. The new buckets are linked to the free list. NB: This overwrites
|
||||
// // the dictionary!
|
||||
// let keys_ptr = inner.keys.as_mut_ptr();
|
||||
// unsafe {
|
||||
// for i in old_num_buckets..num_buckets {
|
||||
// let bucket_ptr = buckets_ptr.add(i as usize);
|
||||
// bucket_ptr.write(core::Bucket {
|
||||
// next: if i < num_buckets-1 {
|
||||
// i as u32 + 1
|
||||
// } else {
|
||||
// inner.free_head
|
||||
// },
|
||||
// prev: if i > 0 {
|
||||
// PrevPos::Chained(i as u32 - 1)
|
||||
// } else {
|
||||
// PrevPos::First(INVALID_POS)
|
||||
// },
|
||||
// inner: None,
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
// self.rehash_dict(inner, keys_ptr, end_ptr, num_buckets, old_num_buckets);
|
||||
// inner.free_head = old_num_buckets;
|
||||
|
||||
let mut open_spots = 0;
|
||||
let mut curr = inner.free_head;
|
||||
while curr != INVALID_POS {
|
||||
if curr < num_buckets {
|
||||
open_spots += 1;
|
||||
}
|
||||
curr = inner.buckets[curr as usize].next;
|
||||
}
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
// /// Begin a shrink, limiting all new allocations to be in buckets with index less than `num_buckets`.
|
||||
// pub fn begin_shrink(&mut self, num_buckets: u32) {
|
||||
// let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
// if num_buckets > map.inner.get_num_buckets() as u32 {
|
||||
// panic!("shrink called with a larger number of buckets");
|
||||
// }
|
||||
// _ = self
|
||||
// .shmem_handle
|
||||
// .as_ref()
|
||||
// .expect("shrink called on a fixed-size hash table");
|
||||
// map.inner.alloc_limit = num_buckets;
|
||||
// }
|
||||
|
||||
// /// Complete a shrink after caller has evicted entries, removing the unused buckets and rehashing.
|
||||
// pub fn finish_shrink(&mut self) -> Result<(), crate::shmem::Error> {
|
||||
// let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
// let inner = &mut map.inner;
|
||||
// if !inner.is_shrinking() {
|
||||
// panic!("called finish_shrink when no shrink is in progress");
|
||||
// }
|
||||
|
||||
// let num_buckets = inner.alloc_limit;
|
||||
|
||||
// if inner.get_num_buckets() == num_buckets as usize {
|
||||
// return Ok(());
|
||||
// }
|
||||
|
||||
for i in (num_buckets as usize)..inner.buckets.len() {
|
||||
if let Some((k, v)) = inner.buckets[i].inner.take() {
|
||||
// alloc bucket increases buckets in use, so need to decrease since we're just moving
|
||||
inner.buckets_in_use -= 1;
|
||||
inner.alloc_bucket(k, v).unwrap();
|
||||
}
|
||||
}
|
||||
// for i in (num_buckets as usize)..inner.buckets.len() {
|
||||
// if inner.buckets[i].inner.is_some() {
|
||||
// // TODO(quantumish) Do we want to treat this as a violation of an invariant
|
||||
// // or a legitimate error the caller can run into? Originally I thought this
|
||||
// // could return something like a UnevictedError(index) as soon as it runs
|
||||
// // into something (that way a caller could clear their soon-to-be-shrinked
|
||||
// // buckets by repeatedly trying to call `finish_shrink`).
|
||||
// //
|
||||
// // Would require making a wider error type enum with this and shmem errors.
|
||||
// panic!("unevicted entries in shrinked space")
|
||||
// }
|
||||
// match inner.buckets[i].prev {
|
||||
// PrevPos::First(_) => {
|
||||
// let next_pos = inner.buckets[i].next;
|
||||
// inner.free_head = next_pos;
|
||||
// if next_pos != INVALID_POS {
|
||||
// inner.buckets[next_pos as usize].prev = PrevPos::First(INVALID_POS);
|
||||
// }
|
||||
// },
|
||||
// PrevPos::Chained(j) => {
|
||||
// let next_pos = inner.buckets[i].next;
|
||||
// inner.buckets[j as usize].next = next_pos;
|
||||
// if next_pos != INVALID_POS {
|
||||
// inner.buckets[next_pos as usize].prev = PrevPos::Chained(j);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
let shmem_handle = self
|
||||
.shmem_handle
|
||||
.as_ref()
|
||||
.expect("shrink called on a fixed-size hash table");
|
||||
// let shmem_handle = self
|
||||
// .shmem_handle
|
||||
// .as_ref()
|
||||
// .expect("shrink called on a fixed-size hash table");
|
||||
|
||||
let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
shmem_handle.set_size(size_bytes)?;
|
||||
let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
let buckets_ptr = inner.buckets.as_mut_ptr();
|
||||
self.rehash_dict(inner, buckets_ptr, end_ptr, num_buckets, num_buckets);
|
||||
inner.alloc_limit = INVALID_POS;
|
||||
// let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
// shmem_handle.set_size(size_bytes)?;
|
||||
// let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
// let buckets_ptr = inner.buckets.as_mut_ptr();
|
||||
// self.rehash_dict(inner, buckets_ptr, end_ptr, num_buckets, num_buckets);
|
||||
// inner.alloc_limit = INVALID_POS;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
@@ -10,17 +10,16 @@ use crate::hash::entry::{Entry, OccupiedEntry, PrevPos, VacantEntry};
|
||||
|
||||
pub(crate) const INVALID_POS: u32 = u32::MAX;
|
||||
|
||||
// Bucket
|
||||
pub(crate) struct Bucket<K, V> {
|
||||
pub(crate) next: u32,
|
||||
pub(crate) inner: Option<(K, V)>,
|
||||
pub(crate) struct LinkedKey<K> {
|
||||
pub(crate) inner: Option<K>,
|
||||
pub(crate) next: u32,
|
||||
}
|
||||
|
||||
pub(crate) struct CoreHashMap<'a, K, V> {
|
||||
/// Dictionary used to map hashes to bucket indices.
|
||||
pub(crate) dictionary: &'a mut [u32],
|
||||
/// Buckets containing key-value pairs.
|
||||
pub(crate) buckets: &'a mut [Bucket<K, V>],
|
||||
pub(crate) keys: &'a mut [LinkedKey<K>],
|
||||
pub(crate) vals: &'a mut [Option<V>],
|
||||
/// Head of the freelist.
|
||||
pub(crate) free_head: u32,
|
||||
|
||||
@@ -45,7 +44,8 @@ where
|
||||
let mut size = 0;
|
||||
|
||||
// buckets
|
||||
size += size_of::<Bucket<K, V>>() * num_buckets as usize;
|
||||
size += (size_of::<LinkedKey<K>>() + size_of::<Option<V>>())
|
||||
* num_buckets as usize;
|
||||
|
||||
// dictionary
|
||||
size += (f32::ceil((size_of::<u32>() * num_buckets as usize) as f32 / Self::FILL_FACTOR))
|
||||
@@ -55,36 +55,43 @@ where
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
buckets: &'a mut [MaybeUninit<Bucket<K, V>>],
|
||||
keys: &'a mut [MaybeUninit<LinkedKey<K>>],
|
||||
vals: &'a mut [MaybeUninit<Option<V>>],
|
||||
dictionary: &'a mut [MaybeUninit<u32>],
|
||||
) -> CoreHashMap<'a, K, V> {
|
||||
// Initialize the buckets
|
||||
for i in 0..buckets.len() {
|
||||
buckets[i].write(Bucket {
|
||||
next: if i < buckets.len() - 1 {
|
||||
for i in 0..keys.len() {
|
||||
keys[i].write(LinkedKey {
|
||||
next: if i < keys.len() - 1 {
|
||||
i as u32 + 1
|
||||
} else {
|
||||
INVALID_POS
|
||||
},
|
||||
inner: None,
|
||||
});
|
||||
}
|
||||
},
|
||||
inner: None,
|
||||
});
|
||||
}
|
||||
for i in 0..vals.len() {
|
||||
vals[i].write(None);
|
||||
}
|
||||
|
||||
// Initialize the dictionary
|
||||
// Initialize the dictionary
|
||||
for i in 0..dictionary.len() {
|
||||
dictionary[i].write(INVALID_POS);
|
||||
}
|
||||
|
||||
// TODO: use std::slice::assume_init_mut() once it stabilizes
|
||||
let buckets =
|
||||
unsafe { std::slice::from_raw_parts_mut(buckets.as_mut_ptr().cast(), buckets.len()) };
|
||||
let keys =
|
||||
unsafe { std::slice::from_raw_parts_mut(keys.as_mut_ptr().cast(), keys.len()) };
|
||||
let vals =
|
||||
unsafe { std::slice::from_raw_parts_mut(vals.as_mut_ptr().cast(), vals.len()) };
|
||||
let dictionary = unsafe {
|
||||
std::slice::from_raw_parts_mut(dictionary.as_mut_ptr().cast(), dictionary.len())
|
||||
};
|
||||
|
||||
CoreHashMap {
|
||||
dictionary,
|
||||
buckets,
|
||||
keys,
|
||||
vals,
|
||||
free_head: 0,
|
||||
buckets_in_use: 0,
|
||||
_user_list_head: INVALID_POS,
|
||||
@@ -99,12 +106,12 @@ where
|
||||
return None;
|
||||
}
|
||||
|
||||
let bucket = &self.buckets[next as usize];
|
||||
let (bucket_key, bucket_value) = bucket.inner.as_ref().expect("entry is in use");
|
||||
let keylink = &self.keys[next as usize];
|
||||
let bucket_key = keylink.inner.as_ref().expect("entry is in use");
|
||||
if bucket_key == key {
|
||||
return Some(&bucket_value);
|
||||
return Some(self.vals[next as usize].as_ref().unwrap());
|
||||
}
|
||||
next = bucket.next;
|
||||
next = keylink.next;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,8 +131,8 @@ where
|
||||
let mut prev_pos = PrevPos::First(dict_pos as u32);
|
||||
let mut next = first;
|
||||
loop {
|
||||
let bucket = &mut self.buckets[next as usize];
|
||||
let (bucket_key, _bucket_value) = bucket.inner.as_mut().expect("entry is in use");
|
||||
let keylink = &mut self.keys[next as usize];
|
||||
let bucket_key = keylink.inner.as_mut().expect("entry is in use");
|
||||
if *bucket_key == key {
|
||||
// found existing entry
|
||||
return Entry::Occupied(OccupiedEntry {
|
||||
@@ -136,7 +143,7 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
if bucket.next == INVALID_POS {
|
||||
if keylink.next == INVALID_POS {
|
||||
// No existing entry
|
||||
return Entry::Vacant(VacantEntry {
|
||||
map: self,
|
||||
@@ -145,12 +152,12 @@ where
|
||||
});
|
||||
}
|
||||
prev_pos = PrevPos::Chained(next);
|
||||
next = bucket.next;
|
||||
next = keylink.next;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_num_buckets(&self) -> usize {
|
||||
self.buckets.len()
|
||||
self.keys.len()
|
||||
}
|
||||
|
||||
pub fn is_shrinking(&self) -> bool {
|
||||
@@ -160,9 +167,9 @@ where
|
||||
/// Clears all entries from the hashmap.
|
||||
/// Does not reset any allocation limits, but does clear any entries beyond them.
|
||||
pub fn clear(&mut self) {
|
||||
for i in 0..self.buckets.len() {
|
||||
self.buckets[i] = Bucket {
|
||||
next: if i < self.buckets.len() - 1 {
|
||||
for i in 0..self.keys.len() {
|
||||
self.keys[i] = LinkedKey {
|
||||
next: if i < self.keys.len() - 1 {
|
||||
i as u32 + 1
|
||||
} else {
|
||||
INVALID_POS
|
||||
@@ -170,6 +177,9 @@ where
|
||||
inner: None,
|
||||
}
|
||||
}
|
||||
for i in 0..self.vals.len() {
|
||||
self.vals[i] = None;
|
||||
}
|
||||
|
||||
for i in 0..self.dictionary.len() {
|
||||
self.dictionary[i] = INVALID_POS;
|
||||
@@ -179,13 +189,13 @@ where
|
||||
}
|
||||
|
||||
pub fn entry_at_bucket(&mut self, pos: usize) -> Option<OccupiedEntry<'a, '_, K, V>> {
|
||||
if pos >= self.buckets.len() {
|
||||
if pos >= self.keys.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let entry = self.buckets[pos].inner.as_ref();
|
||||
let entry = self.keys[pos].inner.as_ref();
|
||||
match entry {
|
||||
Some((key, _)) => Some(OccupiedEntry {
|
||||
Some(key) => Some(OccupiedEntry {
|
||||
_key: key.clone(),
|
||||
bucket_pos: pos as u32,
|
||||
prev_pos: PrevPos::Unknown,
|
||||
@@ -202,9 +212,9 @@ where
|
||||
// Find the first bucket we're *allowed* to use.
|
||||
let mut prev = PrevPos::First(self.free_head);
|
||||
while pos != INVALID_POS && pos >= self.alloc_limit {
|
||||
let bucket = &mut self.buckets[pos as usize];
|
||||
let keylink = &mut self.keys[pos as usize];
|
||||
prev = PrevPos::Chained(pos);
|
||||
pos = bucket.next;
|
||||
pos = keylink.next;
|
||||
}
|
||||
if pos == INVALID_POS {
|
||||
return Err(FullError());
|
||||
@@ -213,21 +223,22 @@ where
|
||||
// Repair the freelist.
|
||||
match prev {
|
||||
PrevPos::First(_) => {
|
||||
let next_pos = self.buckets[pos as usize].next;
|
||||
self.free_head = next_pos;
|
||||
let next_pos = self.keys[pos as usize].next;
|
||||
self.free_head = next_pos;
|
||||
}
|
||||
PrevPos::Chained(p) => if p != INVALID_POS {
|
||||
let next_pos = self.buckets[pos as usize].next;
|
||||
self.buckets[p as usize].next = next_pos;
|
||||
let next_pos = self.keys[pos as usize].next;
|
||||
self.keys[p as usize].next = next_pos;
|
||||
},
|
||||
PrevPos::Unknown => unreachable!()
|
||||
}
|
||||
|
||||
// Initialize the bucket.
|
||||
let bucket = &mut self.buckets[pos as usize];
|
||||
let keylink = &mut self.keys[pos as usize];
|
||||
self.buckets_in_use += 1;
|
||||
bucket.next = INVALID_POS;
|
||||
bucket.inner = Some((key, value));
|
||||
keylink.next = INVALID_POS;
|
||||
keylink.inner = Some(key);
|
||||
self.vals[pos as usize] = Some(value);
|
||||
|
||||
return Ok(pos);
|
||||
}
|
||||
|
||||
@@ -43,49 +43,46 @@ pub struct OccupiedEntry<'a, 'b, K, V> {
|
||||
|
||||
impl<'a, 'b, K, V> OccupiedEntry<'a, 'b, K, V> {
|
||||
pub fn get(&self) -> &V {
|
||||
&self.map.buckets[self.bucket_pos as usize]
|
||||
.inner
|
||||
self.map.vals[self.bucket_pos as usize]
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.1
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self) -> &mut V {
|
||||
&mut self.map.buckets[self.bucket_pos as usize]
|
||||
.inner
|
||||
self.map.vals[self.bucket_pos as usize]
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.1
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, value: V) -> V {
|
||||
let bucket = &mut self.map.buckets[self.bucket_pos as usize];
|
||||
let bucket = &mut self.map.vals[self.bucket_pos as usize];
|
||||
// This assumes inner is Some, which it must be for an OccupiedEntry
|
||||
let old_value = mem::replace(&mut bucket.inner.as_mut().unwrap().1, value);
|
||||
let old_value = mem::replace(bucket.as_mut().unwrap(), value);
|
||||
old_value
|
||||
}
|
||||
|
||||
pub fn remove(self) -> V {
|
||||
// CoreHashMap::remove returns Option<(K, V)>. We know it's Some for an OccupiedEntry.
|
||||
let bucket = &mut self.map.buckets[self.bucket_pos as usize];
|
||||
let keylink = &mut self.map.keys[self.bucket_pos as usize];
|
||||
|
||||
// unlink it from the chain
|
||||
match self.prev_pos {
|
||||
PrevPos::First(dict_pos) => self.map.dictionary[dict_pos as usize] = bucket.next,
|
||||
PrevPos::First(dict_pos) => self.map.dictionary[dict_pos as usize] = keylink.next,
|
||||
PrevPos::Chained(bucket_pos) => {
|
||||
self.map.buckets[bucket_pos as usize].next = bucket.next
|
||||
self.map.keys[bucket_pos as usize].next = keylink.next
|
||||
},
|
||||
PrevPos::Unknown => panic!("can't safely remove entry with unknown previous entry"),
|
||||
}
|
||||
|
||||
// and add it to the freelist
|
||||
let bucket = &mut self.map.buckets[self.bucket_pos as usize];
|
||||
let old_value = bucket.inner.take();
|
||||
bucket.next = self.map.free_head;
|
||||
let keylink = &mut self.map.keys[self.bucket_pos as usize];
|
||||
keylink.inner = None;
|
||||
keylink.next = self.map.free_head;
|
||||
let old_value = self.map.vals[self.bucket_pos as usize].take();
|
||||
self.map.free_head = self.bucket_pos;
|
||||
self.map.buckets_in_use -= 1;
|
||||
|
||||
return old_value.unwrap().1;
|
||||
return old_value.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,11 +98,10 @@ impl<'a, 'b, K: Clone + Hash + Eq, V> VacantEntry<'a, 'b, K, V> {
|
||||
if pos == INVALID_POS {
|
||||
return Err(FullError());
|
||||
}
|
||||
let bucket = &mut self.map.buckets[pos as usize];
|
||||
bucket.next = self.map.dictionary[self.dict_pos as usize];
|
||||
self.map.keys[pos as usize].next = self.map.dictionary[self.dict_pos as usize];
|
||||
self.map.dictionary[self.dict_pos as usize] = pos;
|
||||
|
||||
let result = &mut self.map.buckets[pos as usize].inner.as_mut().unwrap().1;
|
||||
let result = self.map.vals[pos as usize].as_mut().unwrap();
|
||||
return Ok(result);
|
||||
}
|
||||
}
|
||||
|
||||
85
libs/neon-shmem/src/hash/optim.rs
Normal file
85
libs/neon-shmem/src/hash/optim.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
//! Adapted from https://github.com/jsnell/parallel-xxhash (TODO: license?)
|
||||
|
||||
use core::arch::x86::*;
|
||||
|
||||
const PRIME32_1: u32 = 2654435761;
|
||||
const PRIME32_2: u32 = 2246822519;
|
||||
const PRIME32_3: u32 = 3266489917;
|
||||
const PRIME32_4: u32 = 668265263;
|
||||
const PRIME32_5: u32 = 374761393;
|
||||
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
#[target_feature(enable = "avx2")]
|
||||
fn mm256_rol32<const r: u32>(x: __m256i) -> __m256i {
|
||||
return _mm256_or_si256(_mm256_slli_epi32(x, r),
|
||||
_mm256_srli_epi32(x, 32 - r));
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
#[target_feature(enable = "avx2")]
|
||||
fn mm256_fmix32(mut h: __m256i) -> __m256i {
|
||||
h = _mm256_xor_si256(h, _mm256_srli_epi32(h, 15));
|
||||
h = _mm256_mullo_epi32(h, _mm256_set1_epi32(PRIME32_2));
|
||||
h = _mm256_xor_si256(h, _mm256_srli_epi32(h, 13));
|
||||
h = _mm256_mullo_epi32(h, _mm256_set1_epi32(PRIME32_3));
|
||||
h = _mm256_xor_si256(h, _mm256_srli_epi32(h, 16));
|
||||
h
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
#[target_feature(enable = "avx2")]
|
||||
fn mm256_round(mut seed: __m256i, input: __m256i) -> __m256i {
|
||||
seed = _mm256_add_epi32(
|
||||
seed,
|
||||
_mm256_mullo_epi32(input, _mm256_set1_epi32(PRIME32_2))
|
||||
);
|
||||
seed = mm256_rol32::<13>(seed);
|
||||
seed = _mm256_mullo_epi32(seed, _mm256_set1_epi32(PRIME32_1));
|
||||
seed
|
||||
}
|
||||
|
||||
/// Computes xxHash for 8 keys of size 4*N bytes in column-major order.
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
#[target_feature(enable = "avx2")]
|
||||
fn xxhash_many<const N: usize>(keys: *const u32, seed: u32) -> [u32; 8] {
|
||||
let mut res = [0; 8];
|
||||
let mut h = _mm256_set1_epi32(seed + PRIME32_5);
|
||||
if (N >= 4) {
|
||||
let mut v1 = _mm256_set1_epi32(seed + PRIME32_1 + PRIME32_2);
|
||||
let mut v2 = _mm256_set1_epi32(seed + PRIME32_2);
|
||||
let mut v3 = _mm256_set1_epi32(seed);
|
||||
let mut v4 = _mm256_set1_eip32(seed - PRIME32_1);
|
||||
let mut i = 0;
|
||||
while i < (N & !3) {
|
||||
let k1 = _mm256_loadu_si256(keys.add((i + 0) * 8).cast());
|
||||
let k2 = _mm256_loadu_si256(keys.add((i + 1) * 8).cast());
|
||||
let k3 = _mm256_loadu_si256(keys.add((i + 2) * 8).cast());
|
||||
let k4 = _mm256_loadu_si256(keys.add((i + 3) * 8).cast());
|
||||
v1 = mm256_round(v1, k1);
|
||||
v2 = mm256_round(v2, k2);
|
||||
v3 = mm256_round(v3, k3);
|
||||
v4 = mm256_round(v4, k4);
|
||||
i += 4;
|
||||
}
|
||||
h = mm256_rol32::<1>(v1) + mm256_rol32::<7>(v2) +
|
||||
mm256_rol32::<12>(v3) + mm256_rol32::<18>(v4);
|
||||
}
|
||||
|
||||
// Unneeded, keeps bitwise parity with xxhash though.
|
||||
h = _m256_add_epi32(h, _mm256_set1_eip32(N * 4));
|
||||
|
||||
for i in -(N & 3)..0 {
|
||||
let v = _mm256_loadu_si256(keys.add((N + i) * 8));
|
||||
h = _mm256_add_epi32(
|
||||
h,
|
||||
_mm256_mullo_epi32(v, _mm256_set1_epi32(PRIME32_3))
|
||||
);
|
||||
h = _mm256_mullo_epi32(
|
||||
mm256_rol32::<17>(h),
|
||||
_mm256_set1_epi32(PRIME32_4)
|
||||
);
|
||||
}
|
||||
|
||||
_mm256_storeu_si256((&mut res as *mut _).cast(), mm256_fmix32(h));
|
||||
res
|
||||
}
|
||||
Reference in New Issue
Block a user