mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-02 02:00:38 +00:00
Compare commits
10 Commits
heikki/lfc
...
quantumish
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cee8c10582 | ||
|
|
362fa1af8f | ||
|
|
24e6c68772 | ||
|
|
93a45708ff | ||
|
|
6a76bc63f9 | ||
|
|
610ea22c46 | ||
|
|
477648b8cd | ||
|
|
bb1e359872 | ||
|
|
ac87544e79 | ||
|
|
b6b122e07b |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3843,9 +3843,11 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
name = "neon-shmem"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"criterion",
|
||||
"nix 0.30.1",
|
||||
"rand 0.9.1",
|
||||
"rand_distr 0.5.1",
|
||||
"rustc-hash 1.1.0",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
"workspace_hack",
|
||||
|
||||
@@ -8,10 +8,18 @@ license.workspace = true
|
||||
thiserror.workspace = true
|
||||
nix.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
rustc-hash = { version = "2.1.1" }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = { workspace = true, features = ["html_reports"] }
|
||||
rand = "0.9.1"
|
||||
rand_distr = "0.5.1"
|
||||
xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
|
||||
ahash.workspace = true
|
||||
|
||||
[target.'cfg(target_os = "macos")'.dependencies]
|
||||
tempfile = "3.14.0"
|
||||
|
||||
[[bench]]
|
||||
name = "hmap_resize"
|
||||
harness = false
|
||||
|
||||
@@ -8,9 +8,11 @@
|
||||
//! [ ] Resizable
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::hash::{DefaultHasher, Hash, Hasher};
|
||||
use std::hash::{Hash, Hasher, BuildHasher};
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
use rustc_hash::FxBuildHasher;
|
||||
|
||||
use crate::shmem::ShmemHandle;
|
||||
|
||||
mod core;
|
||||
@@ -19,37 +21,85 @@ pub mod entry;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use core::CoreHashMap;
|
||||
mod optim;
|
||||
|
||||
use core::{CoreHashMap, INVALID_POS};
|
||||
use entry::{Entry, OccupiedEntry};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct OutOfMemoryError();
|
||||
|
||||
pub struct HashMapInit<'a, K, V> {
|
||||
pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> {
|
||||
// Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle.
|
||||
shmem_handle: Option<ShmemHandle>,
|
||||
shared_ptr: *mut HashMapShared<'a, K, V>,
|
||||
shared_size: usize,
|
||||
hasher: S,
|
||||
num_buckets: u32,
|
||||
}
|
||||
|
||||
pub struct HashMapAccess<'a, K, V> {
|
||||
pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> {
|
||||
shmem_handle: Option<ShmemHandle>,
|
||||
shared_ptr: *mut HashMapShared<'a, K, V>,
|
||||
hasher: S,
|
||||
}
|
||||
|
||||
unsafe impl<'a, K: Sync, V: Sync> Sync for HashMapAccess<'a, K, V> {}
|
||||
unsafe impl<'a, K: Send, V: Send> Send for HashMapAccess<'a, K, V> {}
|
||||
unsafe impl<'a, K: Sync, V: Sync, S> Sync for HashMapAccess<'a, K, V, S> {}
|
||||
unsafe impl<'a, K: Send, V: Send, S> Send for HashMapAccess<'a, K, V, S> {}
|
||||
|
||||
impl<'a, K, V> HashMapInit<'a, K, V> {
|
||||
pub fn attach_writer(self) -> HashMapAccess<'a, K, V> {
|
||||
impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> {
|
||||
pub fn with_hasher(self, hasher: S) -> HashMapInit<'a, K, V, S> {
|
||||
Self { hasher, ..self }
|
||||
}
|
||||
|
||||
pub fn estimate_size(num_buckets: u32) -> usize {
|
||||
// add some margin to cover alignment etc.
|
||||
CoreHashMap::<K, V>::estimate_size(num_buckets) + size_of::<HashMapShared<K, V>>() + 1000
|
||||
}
|
||||
|
||||
pub fn attach_writer(self) -> HashMapAccess<'a, K, V, S> {
|
||||
let mut ptr: *mut u8 = self.shared_ptr.cast();
|
||||
let end_ptr: *mut u8 = unsafe { ptr.add(self.shared_size) };
|
||||
ptr = unsafe { ptr.add(ptr.align_offset(align_of::<HashMapShared<K, V>>())) };
|
||||
let shared_ptr: *mut HashMapShared<K, V> = ptr.cast();
|
||||
ptr = unsafe { ptr.add(size_of::<HashMapShared<K, V>>()) };
|
||||
|
||||
// carve out the buckets
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<core::LinkedKey<K>>())) };
|
||||
let keys_ptr = ptr;
|
||||
ptr = unsafe { ptr.add(size_of::<core::LinkedKey<K>>() * self.num_buckets as usize) };
|
||||
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<Option<V>>())) };
|
||||
let vals_ptr = ptr;
|
||||
ptr = unsafe { ptr.add(size_of::<Option<V>>() * self.num_buckets as usize) };
|
||||
|
||||
// use remaining space for the dictionary
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<u32>())) };
|
||||
assert!(ptr.addr() < end_ptr.addr());
|
||||
let dictionary_ptr = ptr;
|
||||
let dictionary_size = unsafe { end_ptr.byte_offset_from(ptr) / size_of::<u32>() as isize };
|
||||
assert!(dictionary_size > 0);
|
||||
|
||||
let keys =
|
||||
unsafe { std::slice::from_raw_parts_mut(keys_ptr.cast(), self.num_buckets as usize) };
|
||||
let vals =
|
||||
unsafe { std::slice::from_raw_parts_mut(vals_ptr.cast(), self.num_buckets as usize) };
|
||||
let dictionary = unsafe {
|
||||
std::slice::from_raw_parts_mut(dictionary_ptr.cast(), dictionary_size as usize)
|
||||
};
|
||||
let hashmap = CoreHashMap::new(keys, vals, dictionary);
|
||||
unsafe {
|
||||
std::ptr::write(shared_ptr, HashMapShared { inner: hashmap });
|
||||
}
|
||||
|
||||
HashMapAccess {
|
||||
shmem_handle: self.shmem_handle,
|
||||
shared_ptr: self.shared_ptr,
|
||||
hasher: self.hasher,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn attach_reader(self) -> HashMapAccess<'a, K, V> {
|
||||
pub fn attach_reader(self) -> HashMapAccess<'a, K, V, S> {
|
||||
// no difference to attach_writer currently
|
||||
self.attach_writer()
|
||||
self.attach_writer()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,86 +115,71 @@ impl<'a, K, V> HashMapInit<'a, K, V> {
|
||||
///
|
||||
/// In between the above parts, there can be padding bytes to align the parts correctly.
|
||||
struct HashMapShared<'a, K, V> {
|
||||
inner: CoreHashMap<'a, K, V>,
|
||||
inner: CoreHashMap<'a, K, V>
|
||||
}
|
||||
|
||||
impl<'a, K, V> HashMapInit<'a, K, V>
|
||||
impl<'a, K, V> HashMapInit<'a, K, V, rustc_hash::FxBuildHasher>
|
||||
where
|
||||
K: Clone + Hash + Eq,
|
||||
K: Clone + Hash + Eq
|
||||
{
|
||||
pub fn estimate_size(num_buckets: u32) -> usize {
|
||||
// add some margin to cover alignment etc.
|
||||
CoreHashMap::<K, V>::estimate_size(num_buckets) + size_of::<HashMapShared<K, V>>() + 1000
|
||||
}
|
||||
|
||||
pub fn init_in_fixed_area(
|
||||
num_buckets: u32,
|
||||
pub fn with_fixed(
|
||||
num_buckets: u32,
|
||||
area: &'a mut [MaybeUninit<u8>],
|
||||
) -> HashMapInit<'a, K, V> {
|
||||
Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len())
|
||||
Self {
|
||||
num_buckets,
|
||||
shmem_handle: None,
|
||||
shared_ptr: area.as_mut_ptr().cast(),
|
||||
shared_size: area.len(),
|
||||
hasher: rustc_hash::FxBuildHasher::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize a new hash map in the given shared memory area
|
||||
pub fn init_in_shmem(num_buckets: u32, mut shmem: ShmemHandle) -> HashMapInit<'a, K, V> {
|
||||
let size = Self::estimate_size(num_buckets);
|
||||
shmem
|
||||
pub fn with_shmem(num_buckets: u32, shmem: ShmemHandle) -> HashMapInit<'a, K, V> {
|
||||
let size = Self::estimate_size(num_buckets);
|
||||
shmem
|
||||
.set_size(size)
|
||||
.expect("could not resize shared memory area");
|
||||
|
||||
let ptr = unsafe { shmem.data_ptr.as_mut() };
|
||||
Self::init_common(num_buckets, Some(shmem), ptr, size)
|
||||
Self {
|
||||
num_buckets,
|
||||
shared_ptr: shmem.data_ptr.as_ptr().cast(),
|
||||
shmem_handle: Some(shmem),
|
||||
shared_size: size,
|
||||
hasher: rustc_hash::FxBuildHasher::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn init_common(
|
||||
num_buckets: u32,
|
||||
shmem_handle: Option<ShmemHandle>,
|
||||
area_ptr: *mut u8,
|
||||
area_len: usize,
|
||||
) -> HashMapInit<'a, K, V> {
|
||||
// carve out the HashMapShared struct from the area.
|
||||
let mut ptr: *mut u8 = area_ptr;
|
||||
let end_ptr: *mut u8 = unsafe { area_ptr.add(area_len) };
|
||||
ptr = unsafe { ptr.add(ptr.align_offset(align_of::<HashMapShared<K, V>>())) };
|
||||
let shared_ptr: *mut HashMapShared<K, V> = ptr.cast();
|
||||
ptr = unsafe { ptr.add(size_of::<HashMapShared<K, V>>()) };
|
||||
pub fn new_resizeable_named(num_buckets: u32, max_buckets: u32, name: &str) -> HashMapInit<'a, K, V> {
|
||||
let size = Self::estimate_size(num_buckets);
|
||||
let max_size = Self::estimate_size(max_buckets);
|
||||
let shmem = ShmemHandle::new(name, size, max_size)
|
||||
.expect("failed to make shared memory area");
|
||||
|
||||
Self {
|
||||
num_buckets,
|
||||
shared_ptr: shmem.data_ptr.as_ptr().cast(),
|
||||
shmem_handle: Some(shmem),
|
||||
shared_size: size,
|
||||
hasher: rustc_hash::FxBuildHasher::default()
|
||||
}
|
||||
}
|
||||
|
||||
// carve out the buckets
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<core::Bucket<K, V>>())) };
|
||||
let buckets_ptr = ptr;
|
||||
ptr = unsafe { ptr.add(size_of::<core::Bucket<K, V>>() * num_buckets as usize) };
|
||||
|
||||
// use remaining space for the dictionary
|
||||
ptr = unsafe { ptr.byte_add(ptr.align_offset(align_of::<u32>())) };
|
||||
assert!(ptr.addr() < end_ptr.addr());
|
||||
let dictionary_ptr = ptr;
|
||||
let dictionary_size = unsafe { end_ptr.byte_offset_from(ptr) / size_of::<u32>() as isize };
|
||||
assert!(dictionary_size > 0);
|
||||
|
||||
let buckets =
|
||||
unsafe { std::slice::from_raw_parts_mut(buckets_ptr.cast(), num_buckets as usize) };
|
||||
let dictionary = unsafe {
|
||||
std::slice::from_raw_parts_mut(dictionary_ptr.cast(), dictionary_size as usize)
|
||||
};
|
||||
let hashmap = CoreHashMap::new(buckets, dictionary);
|
||||
unsafe {
|
||||
std::ptr::write(shared_ptr, HashMapShared { inner: hashmap });
|
||||
}
|
||||
|
||||
HashMapInit {
|
||||
shmem_handle: shmem_handle,
|
||||
shared_ptr,
|
||||
}
|
||||
}
|
||||
pub fn new_resizeable(num_buckets: u32, max_buckets: u32) -> HashMapInit<'a, K, V> {
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
const COUNTER: AtomicUsize = AtomicUsize::new(0);
|
||||
let val = COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let name = format!("neon_shmem_hmap{}", val);
|
||||
Self::new_resizeable_named(num_buckets, max_buckets, &name)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, K, V> HashMapAccess<'a, K, V>
|
||||
impl<'a, K, V, S: BuildHasher> HashMapAccess<'a, K, V, S>
|
||||
where
|
||||
K: Clone + Hash + Eq,
|
||||
{
|
||||
pub fn get_hash_value(&self, key: &K) -> u64 {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
key.hash(&mut hasher);
|
||||
hasher.finish()
|
||||
self.hasher.hash_one(key)
|
||||
}
|
||||
|
||||
pub fn get_with_hash<'e>(&'e self, key: &K, hash: u64) -> Option<&'e V> {
|
||||
@@ -184,22 +219,22 @@ where
|
||||
/// iterate through the hash map. (An Iterator might be nicer. The communicator's
|
||||
/// clock algorithm needs to _slowly_ iterate through all buckets with its clock hand,
|
||||
/// without holding a lock. If we switch to an Iterator, it must not hold the lock.)
|
||||
pub fn get_at_bucket(&self, pos: usize) -> Option<&(K, V)> {
|
||||
pub fn get_at_bucket(&self, pos: usize) -> Option<(&K, &V)> {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
|
||||
|
||||
if pos >= map.inner.buckets.len() {
|
||||
if pos >= map.inner.keys.len() {
|
||||
return None;
|
||||
}
|
||||
let bucket = &map.inner.buckets[pos];
|
||||
bucket.inner.as_ref()
|
||||
let key = &map.inner.keys[pos];
|
||||
key.inner.as_ref().map(|k| (k, map.inner.vals[pos].as_ref().unwrap()))
|
||||
}
|
||||
|
||||
pub fn get_bucket_for_value(&self, val_ptr: *const V) -> usize {
|
||||
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
|
||||
|
||||
let origin = map.inner.buckets.as_ptr();
|
||||
let origin = map.inner.vals.as_ptr();
|
||||
let idx = (val_ptr as usize - origin as usize) / (size_of::<V>() as usize);
|
||||
assert!(idx < map.inner.buckets.len());
|
||||
assert!(idx < map.inner.vals.len());
|
||||
|
||||
idx
|
||||
}
|
||||
@@ -210,95 +245,194 @@ where
|
||||
map.inner.buckets_in_use as usize
|
||||
}
|
||||
|
||||
/// Grow
|
||||
///
|
||||
/// 1. grow the underlying shared memory area
|
||||
/// 2. Initialize new buckets. This overwrites the current dictionary
|
||||
/// 3. Recalculate the dictionary
|
||||
pub fn grow(&mut self, num_buckets: u32) -> Result<(), crate::shmem::Error> {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
pub fn clear(&mut self) {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
let inner = &mut map.inner;
|
||||
let old_num_buckets = inner.buckets.len() as u32;
|
||||
|
||||
if num_buckets < old_num_buckets {
|
||||
panic!("grow called with a smaller number of buckets");
|
||||
}
|
||||
if num_buckets == old_num_buckets {
|
||||
return Ok(());
|
||||
}
|
||||
let shmem_handle = self
|
||||
.shmem_handle
|
||||
.as_ref()
|
||||
.expect("grow called on a fixed-size hash table");
|
||||
|
||||
let size_bytes = HashMapInit::<K, V>::estimate_size(num_buckets);
|
||||
shmem_handle.set_size(size_bytes)?;
|
||||
let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
|
||||
// Initialize new buckets. The new buckets are linked to the free list. NB: This overwrites
|
||||
// the dictionary!
|
||||
let buckets_ptr = inner.buckets.as_mut_ptr();
|
||||
unsafe {
|
||||
for i in old_num_buckets..num_buckets {
|
||||
let bucket_ptr = buckets_ptr.add(i as usize);
|
||||
bucket_ptr.write(core::Bucket {
|
||||
next: if i < num_buckets {
|
||||
i as u32 + 1
|
||||
} else {
|
||||
inner.free_head
|
||||
},
|
||||
inner: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Recalculate the dictionary
|
||||
let buckets;
|
||||
inner.clear()
|
||||
}
|
||||
|
||||
/// Helper function that abstracts the common logic between growing and shrinking.
|
||||
/// The only significant difference in the rehashing step is how many buckets to rehash.
|
||||
fn rehash_dict(
|
||||
&mut self,
|
||||
inner: &mut CoreHashMap<'a, K, V>,
|
||||
keys_ptr: *mut core::LinkedKey<K>,
|
||||
end_ptr: *mut u8,
|
||||
num_buckets: u32,
|
||||
rehash_buckets: u32,
|
||||
) {
|
||||
inner.free_head = INVALID_POS;
|
||||
|
||||
// Recalculate the dictionary
|
||||
let keys;
|
||||
let dictionary;
|
||||
unsafe {
|
||||
let buckets_end_ptr = buckets_ptr.add(num_buckets as usize);
|
||||
let dictionary_ptr: *mut u32 = buckets_end_ptr
|
||||
.byte_add(buckets_end_ptr.align_offset(align_of::<u32>()))
|
||||
let keys_end_ptr = keys_ptr.add(num_buckets as usize);
|
||||
let buckets_end_ptr: *mut u8 = (keys_end_ptr as *mut u8)
|
||||
.add(size_of::<Option<V>>() * num_buckets as usize);
|
||||
let dictionary_ptr: *mut u32 = buckets_end_ptr
|
||||
.byte_add(buckets_end_ptr.align_offset(align_of::<u32>()))
|
||||
.cast();
|
||||
let dictionary_size: usize =
|
||||
end_ptr.byte_offset_from(buckets_end_ptr) as usize / size_of::<u32>();
|
||||
|
||||
buckets = std::slice::from_raw_parts_mut(buckets_ptr, num_buckets as usize);
|
||||
keys = std::slice::from_raw_parts_mut(keys_ptr, num_buckets as usize);
|
||||
dictionary = std::slice::from_raw_parts_mut(dictionary_ptr, dictionary_size);
|
||||
}
|
||||
for i in 0..dictionary.len() {
|
||||
dictionary[i] = core::INVALID_POS;
|
||||
dictionary[i] = INVALID_POS;
|
||||
}
|
||||
|
||||
for i in 0..old_num_buckets as usize {
|
||||
if buckets[i].inner.is_none() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut hasher = DefaultHasher::new();
|
||||
buckets[i].inner.as_ref().unwrap().0.hash(&mut hasher);
|
||||
let hash = hasher.finish();
|
||||
for i in 0..rehash_buckets as usize {
|
||||
if keys[i].inner.is_none() {
|
||||
keys[i].next = inner.free_head;
|
||||
inner.free_head = i as u32;
|
||||
continue;
|
||||
}
|
||||
|
||||
let hash = self.hasher.hash_one(&keys[i].inner.as_ref().unwrap());
|
||||
let pos: usize = (hash % dictionary.len() as u64) as usize;
|
||||
buckets[i].next = dictionary[pos];
|
||||
keys[i].next = dictionary[pos];
|
||||
dictionary[pos] = i as u32;
|
||||
}
|
||||
|
||||
// Finally, update the CoreHashMap struct
|
||||
inner.dictionary = dictionary;
|
||||
inner.buckets = buckets;
|
||||
inner.free_head = old_num_buckets;
|
||||
inner.keys = keys;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
/// Rehash the map. Intended for benchmarking only.
|
||||
pub fn shuffle(&mut self) {
|
||||
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
let inner = &mut map.inner;
|
||||
let num_buckets = inner.get_num_buckets() as u32;
|
||||
let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
let end_ptr: *mut u8 = unsafe { (self.shared_ptr as *mut u8).add(size_bytes) };
|
||||
let keys_ptr = inner.keys.as_mut_ptr();
|
||||
self.rehash_dict(inner, keys_ptr, end_ptr, num_buckets, num_buckets);
|
||||
}
|
||||
|
||||
|
||||
// /// Grow
|
||||
// ///
|
||||
// /// 1. grow the underlying shared memory area
|
||||
// /// 2. Initialize new buckets. This overwrites the current dictionary
|
||||
// /// 3. Recalculate the dictionary
|
||||
// pub fn grow(&mut self, num_buckets: u32) -> Result<(), crate::shmem::Error> {
|
||||
// let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
// let inner = &mut map.inner;
|
||||
// let old_num_buckets = inner.buckets.len() as u32;
|
||||
// if num_buckets < old_num_buckets {
|
||||
// panic!("grow called with a smaller number of buckets");
|
||||
// }
|
||||
// if num_buckets == old_num_buckets {
|
||||
// return Ok(());
|
||||
// }
|
||||
// let shmem_handle = self
|
||||
// .shmem_handle
|
||||
// .as_ref()
|
||||
// .expect("grow called on a fixed-size hash table");
|
||||
|
||||
// let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
// shmem_handle.set_size(size_bytes)?;
|
||||
// let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
|
||||
// // Initialize new buckets. The new buckets are linked to the free list. NB: This overwrites
|
||||
// // the dictionary!
|
||||
// let keys_ptr = inner.keys.as_mut_ptr();
|
||||
// unsafe {
|
||||
// for i in old_num_buckets..num_buckets {
|
||||
// let bucket_ptr = buckets_ptr.add(i as usize);
|
||||
// bucket_ptr.write(core::Bucket {
|
||||
// next: if i < num_buckets-1 {
|
||||
// i as u32 + 1
|
||||
// } else {
|
||||
// inner.free_head
|
||||
// },
|
||||
// prev: if i > 0 {
|
||||
// PrevPos::Chained(i as u32 - 1)
|
||||
// } else {
|
||||
// PrevPos::First(INVALID_POS)
|
||||
// },
|
||||
// inner: None,
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
// self.rehash_dict(inner, keys_ptr, end_ptr, num_buckets, old_num_buckets);
|
||||
// inner.free_head = old_num_buckets;
|
||||
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
// /// Begin a shrink, limiting all new allocations to be in buckets with index less than `num_buckets`.
|
||||
// pub fn begin_shrink(&mut self, num_buckets: u32) {
|
||||
// let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
// if num_buckets > map.inner.get_num_buckets() as u32 {
|
||||
// panic!("shrink called with a larger number of buckets");
|
||||
// }
|
||||
// _ = self
|
||||
// .shmem_handle
|
||||
// .as_ref()
|
||||
// .expect("shrink called on a fixed-size hash table");
|
||||
// map.inner.alloc_limit = num_buckets;
|
||||
// }
|
||||
|
||||
// /// Complete a shrink after caller has evicted entries, removing the unused buckets and rehashing.
|
||||
// pub fn finish_shrink(&mut self) -> Result<(), crate::shmem::Error> {
|
||||
// let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
|
||||
// let inner = &mut map.inner;
|
||||
// if !inner.is_shrinking() {
|
||||
// panic!("called finish_shrink when no shrink is in progress");
|
||||
// }
|
||||
|
||||
// let num_buckets = inner.alloc_limit;
|
||||
|
||||
// if inner.get_num_buckets() == num_buckets as usize {
|
||||
// return Ok(());
|
||||
// }
|
||||
|
||||
// for i in (num_buckets as usize)..inner.buckets.len() {
|
||||
// if inner.buckets[i].inner.is_some() {
|
||||
// // TODO(quantumish) Do we want to treat this as a violation of an invariant
|
||||
// // or a legitimate error the caller can run into? Originally I thought this
|
||||
// // could return something like a UnevictedError(index) as soon as it runs
|
||||
// // into something (that way a caller could clear their soon-to-be-shrinked
|
||||
// // buckets by repeatedly trying to call `finish_shrink`).
|
||||
// //
|
||||
// // Would require making a wider error type enum with this and shmem errors.
|
||||
// panic!("unevicted entries in shrinked space")
|
||||
// }
|
||||
// match inner.buckets[i].prev {
|
||||
// PrevPos::First(_) => {
|
||||
// let next_pos = inner.buckets[i].next;
|
||||
// inner.free_head = next_pos;
|
||||
// if next_pos != INVALID_POS {
|
||||
// inner.buckets[next_pos as usize].prev = PrevPos::First(INVALID_POS);
|
||||
// }
|
||||
// },
|
||||
// PrevPos::Chained(j) => {
|
||||
// let next_pos = inner.buckets[i].next;
|
||||
// inner.buckets[j as usize].next = next_pos;
|
||||
// if next_pos != INVALID_POS {
|
||||
// inner.buckets[next_pos as usize].prev = PrevPos::Chained(j);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// let shmem_handle = self
|
||||
// .shmem_handle
|
||||
// .as_ref()
|
||||
// .expect("shrink called on a fixed-size hash table");
|
||||
|
||||
// let size_bytes = HashMapInit::<K, V, S>::estimate_size(num_buckets);
|
||||
// shmem_handle.set_size(size_bytes)?;
|
||||
// let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) };
|
||||
// let buckets_ptr = inner.buckets.as_mut_ptr();
|
||||
// self.rehash_dict(inner, buckets_ptr, end_ptr, num_buckets, num_buckets);
|
||||
// inner.alloc_limit = INVALID_POS;
|
||||
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
// TODO: Shrinking is a multi-step process that requires co-operation from the caller
|
||||
//
|
||||
// 1. The caller must first call begin_shrink(). That forbids allocation of higher-numbered
|
||||
// buckets.
|
||||
//
|
||||
// 2. Next, the caller must evict all entries in higher-numbered buckets.
|
||||
//
|
||||
// 3. Finally, call finish_shrink(). This recomputes the dictionary and shrinks the underlying
|
||||
// shmem area
|
||||
}
|
||||
|
||||
@@ -10,18 +10,22 @@ use crate::hash::entry::{Entry, OccupiedEntry, PrevPos, VacantEntry};
|
||||
|
||||
pub(crate) const INVALID_POS: u32 = u32::MAX;
|
||||
|
||||
// Bucket
|
||||
pub(crate) struct Bucket<K, V> {
|
||||
pub(crate) next: u32,
|
||||
pub(crate) inner: Option<(K, V)>,
|
||||
pub(crate) struct LinkedKey<K> {
|
||||
pub(crate) inner: Option<K>,
|
||||
pub(crate) next: u32,
|
||||
}
|
||||
|
||||
pub(crate) struct CoreHashMap<'a, K, V> {
|
||||
/// Dictionary used to map hashes to bucket indices.
|
||||
pub(crate) dictionary: &'a mut [u32],
|
||||
pub(crate) buckets: &'a mut [Bucket<K, V>],
|
||||
pub(crate) keys: &'a mut [LinkedKey<K>],
|
||||
pub(crate) vals: &'a mut [Option<V>],
|
||||
/// Head of the freelist.
|
||||
pub(crate) free_head: u32,
|
||||
|
||||
pub(crate) _user_list_head: u32,
|
||||
/// Maximum index of a bucket allowed to be allocated. INVALID_POS if no limit.
|
||||
pub(crate) alloc_limit: u32,
|
||||
|
||||
// metrics
|
||||
pub(crate) buckets_in_use: u32,
|
||||
@@ -40,49 +44,58 @@ where
|
||||
let mut size = 0;
|
||||
|
||||
// buckets
|
||||
size += size_of::<Bucket<K, V>>() * num_buckets as usize;
|
||||
size += (size_of::<LinkedKey<K>>() + size_of::<Option<V>>())
|
||||
* num_buckets as usize;
|
||||
|
||||
// dictionary
|
||||
size += (f32::ceil((size_of::<u32>() * num_buckets as usize) as f32 / Self::FILL_FACTOR))
|
||||
as usize;
|
||||
|
||||
size
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
buckets: &'a mut [MaybeUninit<Bucket<K, V>>],
|
||||
keys: &'a mut [MaybeUninit<LinkedKey<K>>],
|
||||
vals: &'a mut [MaybeUninit<Option<V>>],
|
||||
dictionary: &'a mut [MaybeUninit<u32>],
|
||||
) -> CoreHashMap<'a, K, V> {
|
||||
// Initialize the buckets
|
||||
for i in 0..buckets.len() {
|
||||
buckets[i].write(Bucket {
|
||||
next: if i < buckets.len() - 1 {
|
||||
for i in 0..keys.len() {
|
||||
keys[i].write(LinkedKey {
|
||||
next: if i < keys.len() - 1 {
|
||||
i as u32 + 1
|
||||
} else {
|
||||
INVALID_POS
|
||||
},
|
||||
inner: None,
|
||||
});
|
||||
}
|
||||
inner: None,
|
||||
});
|
||||
}
|
||||
for i in 0..vals.len() {
|
||||
vals[i].write(None);
|
||||
}
|
||||
|
||||
// Initialize the dictionary
|
||||
// Initialize the dictionary
|
||||
for i in 0..dictionary.len() {
|
||||
dictionary[i].write(INVALID_POS);
|
||||
}
|
||||
|
||||
// TODO: use std::slice::assume_init_mut() once it stabilizes
|
||||
let buckets =
|
||||
unsafe { std::slice::from_raw_parts_mut(buckets.as_mut_ptr().cast(), buckets.len()) };
|
||||
let keys =
|
||||
unsafe { std::slice::from_raw_parts_mut(keys.as_mut_ptr().cast(), keys.len()) };
|
||||
let vals =
|
||||
unsafe { std::slice::from_raw_parts_mut(vals.as_mut_ptr().cast(), vals.len()) };
|
||||
let dictionary = unsafe {
|
||||
std::slice::from_raw_parts_mut(dictionary.as_mut_ptr().cast(), dictionary.len())
|
||||
};
|
||||
|
||||
CoreHashMap {
|
||||
dictionary,
|
||||
buckets,
|
||||
keys,
|
||||
vals,
|
||||
free_head: 0,
|
||||
buckets_in_use: 0,
|
||||
_user_list_head: INVALID_POS,
|
||||
alloc_limit: INVALID_POS,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,12 +106,12 @@ where
|
||||
return None;
|
||||
}
|
||||
|
||||
let bucket = &self.buckets[next as usize];
|
||||
let (bucket_key, bucket_value) = bucket.inner.as_ref().expect("entry is in use");
|
||||
let keylink = &self.keys[next as usize];
|
||||
let bucket_key = keylink.inner.as_ref().expect("entry is in use");
|
||||
if bucket_key == key {
|
||||
return Some(&bucket_value);
|
||||
return Some(self.vals[next as usize].as_ref().unwrap());
|
||||
}
|
||||
next = bucket.next;
|
||||
next = keylink.next;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,8 +131,8 @@ where
|
||||
let mut prev_pos = PrevPos::First(dict_pos as u32);
|
||||
let mut next = first;
|
||||
loop {
|
||||
let bucket = &mut self.buckets[next as usize];
|
||||
let (bucket_key, _bucket_value) = bucket.inner.as_mut().expect("entry is in use");
|
||||
let keylink = &mut self.keys[next as usize];
|
||||
let bucket_key = keylink.inner.as_mut().expect("entry is in use");
|
||||
if *bucket_key == key {
|
||||
// found existing entry
|
||||
return Entry::Occupied(OccupiedEntry {
|
||||
@@ -130,7 +143,7 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
if bucket.next == INVALID_POS {
|
||||
if keylink.next == INVALID_POS {
|
||||
// No existing entry
|
||||
return Entry::Vacant(VacantEntry {
|
||||
map: self,
|
||||
@@ -139,36 +152,96 @@ where
|
||||
});
|
||||
}
|
||||
prev_pos = PrevPos::Chained(next);
|
||||
next = bucket.next;
|
||||
next = keylink.next;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_num_buckets(&self) -> usize {
|
||||
self.buckets.len()
|
||||
self.keys.len()
|
||||
}
|
||||
|
||||
pub fn entry_at_bucket(&mut self, pos: usize) -> Option<OccupiedEntry<K, V>> {
|
||||
if pos >= self.buckets.len() {
|
||||
return None;
|
||||
pub fn is_shrinking(&self) -> bool {
|
||||
self.alloc_limit != INVALID_POS
|
||||
}
|
||||
|
||||
/// Clears all entries from the hashmap.
|
||||
/// Does not reset any allocation limits, but does clear any entries beyond them.
|
||||
pub fn clear(&mut self) {
|
||||
for i in 0..self.keys.len() {
|
||||
self.keys[i] = LinkedKey {
|
||||
next: if i < self.keys.len() - 1 {
|
||||
i as u32 + 1
|
||||
} else {
|
||||
INVALID_POS
|
||||
},
|
||||
inner: None,
|
||||
}
|
||||
}
|
||||
for i in 0..self.vals.len() {
|
||||
self.vals[i] = None;
|
||||
}
|
||||
|
||||
for i in 0..self.dictionary.len() {
|
||||
self.dictionary[i] = INVALID_POS;
|
||||
}
|
||||
|
||||
todo!()
|
||||
//self.buckets[pos].inner.as_ref()
|
||||
self.buckets_in_use = 0;
|
||||
}
|
||||
|
||||
pub fn entry_at_bucket(&mut self, pos: usize) -> Option<OccupiedEntry<'a, '_, K, V>> {
|
||||
if pos >= self.keys.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let entry = self.keys[pos].inner.as_ref();
|
||||
match entry {
|
||||
Some(key) => Some(OccupiedEntry {
|
||||
_key: key.clone(),
|
||||
bucket_pos: pos as u32,
|
||||
prev_pos: PrevPos::Unknown,
|
||||
map: self,
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the position of an unused bucket via the freelist and initialize it.
|
||||
pub(crate) fn alloc_bucket(&mut self, key: K, value: V) -> Result<u32, FullError> {
|
||||
let pos = self.free_head;
|
||||
if pos == INVALID_POS {
|
||||
return Err(FullError());
|
||||
}
|
||||
let mut pos = self.free_head;
|
||||
|
||||
let bucket = &mut self.buckets[pos as usize];
|
||||
self.free_head = bucket.next;
|
||||
self.buckets_in_use += 1;
|
||||
// Find the first bucket we're *allowed* to use.
|
||||
let mut prev = PrevPos::First(self.free_head);
|
||||
while pos != INVALID_POS && pos >= self.alloc_limit {
|
||||
let keylink = &mut self.keys[pos as usize];
|
||||
prev = PrevPos::Chained(pos);
|
||||
pos = keylink.next;
|
||||
}
|
||||
if pos == INVALID_POS {
|
||||
return Err(FullError());
|
||||
}
|
||||
|
||||
bucket.next = INVALID_POS;
|
||||
bucket.inner = Some((key, value));
|
||||
// Repair the freelist.
|
||||
match prev {
|
||||
PrevPos::First(_) => {
|
||||
let next_pos = self.keys[pos as usize].next;
|
||||
self.free_head = next_pos;
|
||||
}
|
||||
PrevPos::Chained(p) => if p != INVALID_POS {
|
||||
let next_pos = self.keys[pos as usize].next;
|
||||
self.keys[p as usize].next = next_pos;
|
||||
},
|
||||
PrevPos::Unknown => unreachable!()
|
||||
}
|
||||
|
||||
// Initialize the bucket.
|
||||
let keylink = &mut self.keys[pos as usize];
|
||||
self.buckets_in_use += 1;
|
||||
keylink.next = INVALID_POS;
|
||||
keylink.inner = Some(key);
|
||||
self.vals[pos as usize] = Some(value);
|
||||
|
||||
return Ok(pos);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -10,62 +10,79 @@ pub enum Entry<'a, 'b, K, V> {
|
||||
Vacant(VacantEntry<'a, 'b, K, V>),
|
||||
}
|
||||
|
||||
/// Helper enum representing the previous position within a hashmap chain.
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) enum PrevPos {
|
||||
/// Starting index within the dictionary.
|
||||
First(u32),
|
||||
/// Regular index within the buckets.
|
||||
Chained(u32),
|
||||
/// Unknown - e.g. the associated entry was retrieved by index instead of chain.
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl PrevPos {
|
||||
/// Unwrap an index from a `PrevPos::First`, panicking otherwise.
|
||||
pub fn unwrap_first(&self) -> u32 {
|
||||
match self {
|
||||
Self::First(i) => *i,
|
||||
_ => panic!("not first entry in chain")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OccupiedEntry<'a, 'b, K, V> {
|
||||
pub(crate) map: &'b mut CoreHashMap<'a, K, V>,
|
||||
pub(crate) _key: K, // The key of the occupied entry
|
||||
pub(crate) map: &'b mut CoreHashMap<'a, K, V>,
|
||||
/// The key of the occupied entry
|
||||
pub(crate) _key: K,
|
||||
/// The index of the previous entry in the chain.
|
||||
pub(crate) prev_pos: PrevPos,
|
||||
pub(crate) bucket_pos: u32, // The position of the bucket in the CoreHashMap's buckets array
|
||||
/// The position of the bucket in the CoreHashMap's buckets array.
|
||||
pub(crate) bucket_pos: u32,
|
||||
}
|
||||
|
||||
impl<'a, 'b, K, V> OccupiedEntry<'a, 'b, K, V> {
|
||||
pub fn get(&self) -> &V {
|
||||
&self.map.buckets[self.bucket_pos as usize]
|
||||
.inner
|
||||
self.map.vals[self.bucket_pos as usize]
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.1
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self) -> &mut V {
|
||||
&mut self.map.buckets[self.bucket_pos as usize]
|
||||
.inner
|
||||
self.map.vals[self.bucket_pos as usize]
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.1
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, value: V) -> V {
|
||||
let bucket = &mut self.map.buckets[self.bucket_pos as usize];
|
||||
let bucket = &mut self.map.vals[self.bucket_pos as usize];
|
||||
// This assumes inner is Some, which it must be for an OccupiedEntry
|
||||
let old_value = mem::replace(&mut bucket.inner.as_mut().unwrap().1, value);
|
||||
let old_value = mem::replace(bucket.as_mut().unwrap(), value);
|
||||
old_value
|
||||
}
|
||||
|
||||
pub fn remove(self) -> V {
|
||||
// CoreHashMap::remove returns Option<(K, V)>. We know it's Some for an OccupiedEntry.
|
||||
let bucket = &mut self.map.buckets[self.bucket_pos as usize];
|
||||
let keylink = &mut self.map.keys[self.bucket_pos as usize];
|
||||
|
||||
// unlink it from the chain
|
||||
match self.prev_pos {
|
||||
PrevPos::First(dict_pos) => self.map.dictionary[dict_pos as usize] = bucket.next,
|
||||
PrevPos::First(dict_pos) => self.map.dictionary[dict_pos as usize] = keylink.next,
|
||||
PrevPos::Chained(bucket_pos) => {
|
||||
self.map.buckets[bucket_pos as usize].next = bucket.next
|
||||
}
|
||||
self.map.keys[bucket_pos as usize].next = keylink.next
|
||||
},
|
||||
PrevPos::Unknown => panic!("can't safely remove entry with unknown previous entry"),
|
||||
}
|
||||
|
||||
// and add it to the freelist
|
||||
let bucket = &mut self.map.buckets[self.bucket_pos as usize];
|
||||
let old_value = bucket.inner.take();
|
||||
bucket.next = self.map.free_head;
|
||||
// and add it to the freelist
|
||||
let keylink = &mut self.map.keys[self.bucket_pos as usize];
|
||||
keylink.inner = None;
|
||||
keylink.next = self.map.free_head;
|
||||
let old_value = self.map.vals[self.bucket_pos as usize].take();
|
||||
self.map.free_head = self.bucket_pos;
|
||||
self.map.buckets_in_use -= 1;
|
||||
|
||||
return old_value.unwrap().1;
|
||||
return old_value.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,11 +98,10 @@ impl<'a, 'b, K: Clone + Hash + Eq, V> VacantEntry<'a, 'b, K, V> {
|
||||
if pos == INVALID_POS {
|
||||
return Err(FullError());
|
||||
}
|
||||
let bucket = &mut self.map.buckets[pos as usize];
|
||||
bucket.next = self.map.dictionary[self.dict_pos as usize];
|
||||
self.map.keys[pos as usize].next = self.map.dictionary[self.dict_pos as usize];
|
||||
self.map.dictionary[self.dict_pos as usize] = pos;
|
||||
|
||||
let result = &mut self.map.buckets[pos as usize].inner.as_mut().unwrap().1;
|
||||
let result = self.map.vals[pos as usize].as_mut().unwrap();
|
||||
return Ok(result);
|
||||
}
|
||||
}
|
||||
|
||||
85
libs/neon-shmem/src/hash/optim.rs
Normal file
85
libs/neon-shmem/src/hash/optim.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
//! Adapted from https://github.com/jsnell/parallel-xxhash (TODO: license?)
|
||||
|
||||
use core::arch::x86::*;
|
||||
|
||||
const PRIME32_1: u32 = 2654435761;
|
||||
const PRIME32_2: u32 = 2246822519;
|
||||
const PRIME32_3: u32 = 3266489917;
|
||||
const PRIME32_4: u32 = 668265263;
|
||||
const PRIME32_5: u32 = 374761393;
|
||||
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
#[target_feature(enable = "avx2")]
|
||||
fn mm256_rol32<const r: u32>(x: __m256i) -> __m256i {
|
||||
return _mm256_or_si256(_mm256_slli_epi32(x, r),
|
||||
_mm256_srli_epi32(x, 32 - r));
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
#[target_feature(enable = "avx2")]
|
||||
fn mm256_fmix32(mut h: __m256i) -> __m256i {
|
||||
h = _mm256_xor_si256(h, _mm256_srli_epi32(h, 15));
|
||||
h = _mm256_mullo_epi32(h, _mm256_set1_epi32(PRIME32_2));
|
||||
h = _mm256_xor_si256(h, _mm256_srli_epi32(h, 13));
|
||||
h = _mm256_mullo_epi32(h, _mm256_set1_epi32(PRIME32_3));
|
||||
h = _mm256_xor_si256(h, _mm256_srli_epi32(h, 16));
|
||||
h
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
#[target_feature(enable = "avx2")]
|
||||
fn mm256_round(mut seed: __m256i, input: __m256i) -> __m256i {
|
||||
seed = _mm256_add_epi32(
|
||||
seed,
|
||||
_mm256_mullo_epi32(input, _mm256_set1_epi32(PRIME32_2))
|
||||
);
|
||||
seed = mm256_rol32::<13>(seed);
|
||||
seed = _mm256_mullo_epi32(seed, _mm256_set1_epi32(PRIME32_1));
|
||||
seed
|
||||
}
|
||||
|
||||
/// Computes xxHash for 8 keys of size 4*N bytes in column-major order.
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
#[target_feature(enable = "avx2")]
|
||||
fn xxhash_many<const N: usize>(keys: *const u32, seed: u32) -> [u32; 8] {
|
||||
let mut res = [0; 8];
|
||||
let mut h = _mm256_set1_epi32(seed + PRIME32_5);
|
||||
if (N >= 4) {
|
||||
let mut v1 = _mm256_set1_epi32(seed + PRIME32_1 + PRIME32_2);
|
||||
let mut v2 = _mm256_set1_epi32(seed + PRIME32_2);
|
||||
let mut v3 = _mm256_set1_epi32(seed);
|
||||
let mut v4 = _mm256_set1_eip32(seed - PRIME32_1);
|
||||
let mut i = 0;
|
||||
while i < (N & !3) {
|
||||
let k1 = _mm256_loadu_si256(keys.add((i + 0) * 8).cast());
|
||||
let k2 = _mm256_loadu_si256(keys.add((i + 1) * 8).cast());
|
||||
let k3 = _mm256_loadu_si256(keys.add((i + 2) * 8).cast());
|
||||
let k4 = _mm256_loadu_si256(keys.add((i + 3) * 8).cast());
|
||||
v1 = mm256_round(v1, k1);
|
||||
v2 = mm256_round(v2, k2);
|
||||
v3 = mm256_round(v3, k3);
|
||||
v4 = mm256_round(v4, k4);
|
||||
i += 4;
|
||||
}
|
||||
h = mm256_rol32::<1>(v1) + mm256_rol32::<7>(v2) +
|
||||
mm256_rol32::<12>(v3) + mm256_rol32::<18>(v4);
|
||||
}
|
||||
|
||||
// Unneeded, keeps bitwise parity with xxhash though.
|
||||
h = _m256_add_epi32(h, _mm256_set1_eip32(N * 4));
|
||||
|
||||
for i in -(N & 3)..0 {
|
||||
let v = _mm256_loadu_si256(keys.add((N + i) * 8));
|
||||
h = _mm256_add_epi32(
|
||||
h,
|
||||
_mm256_mullo_epi32(v, _mm256_set1_epi32(PRIME32_3))
|
||||
);
|
||||
h = _mm256_mullo_epi32(
|
||||
mm256_rol32::<17>(h),
|
||||
_mm256_set1_epi32(PRIME32_4)
|
||||
);
|
||||
}
|
||||
|
||||
_mm256_storeu_si256((&mut res as *mut _).cast(), mm256_fmix32(h));
|
||||
res
|
||||
}
|
||||
@@ -1,11 +1,13 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::mem::uninitialized;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use crate::hash::HashMapAccess;
|
||||
use crate::hash::HashMapInit;
|
||||
use crate::hash::UpdateAction;
|
||||
use crate::hash::Entry;
|
||||
use crate::shmem::ShmemHandle;
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
@@ -35,25 +37,29 @@ impl<'a> From<&'a [u8]> for TestKey {
|
||||
}
|
||||
}
|
||||
|
||||
fn test_inserts<K: Into<TestKey> + Copy>(keys: &[K]) {
|
||||
const MAX_MEM_SIZE: usize = 10000000;
|
||||
let shmem = ShmemHandle::new("test_inserts", 0, MAX_MEM_SIZE).unwrap();
|
||||
|
||||
let init_struct = HashMapInit::<TestKey, usize>::init_in_shmem(100000, shmem);
|
||||
let w = init_struct.attach_writer();
|
||||
fn test_inserts<K: Into<TestKey> + Copy>(keys: &[K]) {
|
||||
let mut w = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
100000, 120000, "test_inserts"
|
||||
).attach_writer();
|
||||
|
||||
for (idx, k) in keys.iter().enumerate() {
|
||||
let res = w.insert(&(*k).into(), idx);
|
||||
assert!(res.is_ok());
|
||||
let hash = w.get_hash_value(&(*k).into());
|
||||
let res = w.entry_with_hash((*k).into(), hash);
|
||||
match res {
|
||||
Entry::Occupied(mut e) => { e.insert(idx); }
|
||||
Entry::Vacant(e) => {
|
||||
let res = e.insert(idx);
|
||||
assert!(res.is_ok());
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
for (idx, k) in keys.iter().enumerate() {
|
||||
let x = w.get(&(*k).into());
|
||||
let hash = w.get_hash_value(&(*k).into());
|
||||
let x = w.get_with_hash(&(*k).into(), hash);
|
||||
let value = x.as_deref().copied();
|
||||
assert_eq!(value, Some(idx));
|
||||
}
|
||||
|
||||
//eprintln!("stats: {:?}", tree_writer.get_statistics());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -121,11 +127,9 @@ struct TestOp(TestKey, Option<usize>);
|
||||
|
||||
fn apply_op(
|
||||
op: &TestOp,
|
||||
sut: &HashMapAccess<TestKey, TestValue>,
|
||||
map: &mut HashMapAccess<TestKey, usize>,
|
||||
shadow: &mut BTreeMap<TestKey, usize>,
|
||||
) {
|
||||
eprintln!("applying op: {op:?}");
|
||||
|
||||
// apply the change to the shadow tree first
|
||||
let shadow_existing = if let Some(v) = op.1 {
|
||||
shadow.insert(op.0, v)
|
||||
@@ -133,33 +137,78 @@ fn apply_op(
|
||||
shadow.remove(&op.0)
|
||||
};
|
||||
|
||||
// apply to Art tree
|
||||
sut.update_with_fn(&op.0, |existing| {
|
||||
assert_eq!(existing.map(TestValue::load), shadow_existing);
|
||||
let hash = map.get_hash_value(&op.0);
|
||||
let entry = map.entry_with_hash(op.0, hash);
|
||||
let hash_existing = match op.1 {
|
||||
Some(new) => {
|
||||
match entry {
|
||||
Entry::Occupied(mut e) => Some(e.insert(new)),
|
||||
Entry::Vacant(e) => { e.insert(new).unwrap(); None },
|
||||
}
|
||||
},
|
||||
None => {
|
||||
match entry {
|
||||
Entry::Occupied(e) => Some(e.remove()),
|
||||
Entry::Vacant(_) => None,
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
match (existing, op.1) {
|
||||
(None, None) => UpdateAction::Nothing,
|
||||
(None, Some(new_val)) => UpdateAction::Insert(TestValue::new(new_val)),
|
||||
(Some(_old_val), None) => UpdateAction::Remove,
|
||||
(Some(old_val), Some(new_val)) => {
|
||||
old_val.0.store(new_val, Ordering::Relaxed);
|
||||
UpdateAction::Nothing
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("out of memory");
|
||||
assert_eq!(shadow_existing, hash_existing);
|
||||
}
|
||||
|
||||
fn do_random_ops(
|
||||
num_ops: usize,
|
||||
size: u32,
|
||||
del_prob: f64,
|
||||
writer: &mut HashMapAccess<TestKey, usize>,
|
||||
shadow: &mut BTreeMap<TestKey, usize>,
|
||||
rng: &mut rand::rngs::ThreadRng,
|
||||
) {
|
||||
for i in 0..num_ops {
|
||||
let key: TestKey = ((rng.next_u32() % size) as u128).into();
|
||||
let op = TestOp(key, if rng.random_bool(del_prob) { Some(i) } else { None });
|
||||
apply_op(&op, writer, shadow);
|
||||
}
|
||||
}
|
||||
|
||||
fn do_deletes(
|
||||
num_ops: usize,
|
||||
writer: &mut HashMapAccess<TestKey, usize>,
|
||||
shadow: &mut BTreeMap<TestKey, usize>,
|
||||
) {
|
||||
for i in 0..num_ops {
|
||||
let (k, _) = shadow.pop_first().unwrap();
|
||||
let hash = writer.get_hash_value(&k);
|
||||
writer.remove_with_hash(&k, hash);
|
||||
}
|
||||
}
|
||||
|
||||
fn do_shrink(
|
||||
writer: &mut HashMapAccess<TestKey, usize>,
|
||||
shadow: &mut BTreeMap<TestKey, usize>,
|
||||
from: u32,
|
||||
to: u32
|
||||
) {
|
||||
writer.begin_shrink(to);
|
||||
while writer.get_num_buckets_in_use() > to as usize {
|
||||
let (k, _) = shadow.pop_first().unwrap();
|
||||
let hash = writer.get_hash_value(&k);
|
||||
let entry = writer.entry_with_hash(k, hash);
|
||||
if let Entry::Occupied(mut e) = entry {
|
||||
e.remove();
|
||||
}
|
||||
}
|
||||
writer.finish_shrink().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn random_ops() {
|
||||
const MAX_MEM_SIZE: usize = 10000000;
|
||||
let shmem = ShmemHandle::new("test_inserts", 0, MAX_MEM_SIZE).unwrap();
|
||||
|
||||
let init_struct = HashMapInit::<TestKey, TestValue>::init_in_shmem(100000, shmem);
|
||||
let writer = init_struct.attach_writer();
|
||||
|
||||
let mut writer = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
100000, 120000, "test_random"
|
||||
).attach_writer();
|
||||
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();
|
||||
|
||||
|
||||
let distribution = Zipf::new(u128::MAX as f64, 1.1).unwrap();
|
||||
let mut rng = rand::rng();
|
||||
for i in 0..100000 {
|
||||
@@ -167,54 +216,167 @@ fn random_ops() {
|
||||
|
||||
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
|
||||
|
||||
apply_op(&op, &writer, &mut shadow);
|
||||
apply_op(&op, &mut writer, &mut shadow);
|
||||
|
||||
if i % 1000 == 0 {
|
||||
eprintln!("{i} ops processed");
|
||||
//eprintln!("stats: {:?}", tree_writer.get_statistics());
|
||||
//test_iter(&tree_writer, &shadow);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_shuffle() {
|
||||
let mut writer = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
1000, 1200, "test_shuf"
|
||||
).attach_writer();
|
||||
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();
|
||||
let mut rng = rand::rng();
|
||||
|
||||
do_random_ops(10000, 1000, 0.75, &mut writer, &mut shadow, &mut rng);
|
||||
writer.shuffle();
|
||||
do_random_ops(10000, 1000, 0.75, &mut writer, &mut shadow, &mut rng);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_grow() {
|
||||
const MEM_SIZE: usize = 10000000;
|
||||
let shmem = ShmemHandle::new("test_grow", 0, MEM_SIZE).unwrap();
|
||||
|
||||
let init_struct = HashMapInit::<TestKey, TestValue>::init_in_shmem(1000, shmem);
|
||||
let writer = init_struct.attach_writer();
|
||||
|
||||
let mut writer = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
1000, 2000, "test_grow"
|
||||
).attach_writer();
|
||||
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();
|
||||
|
||||
let mut rng = rand::rng();
|
||||
for i in 0..10000 {
|
||||
let key: TestKey = ((rng.next_u32() % 1000) as u128).into();
|
||||
|
||||
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
|
||||
|
||||
apply_op(&op, &writer, &mut shadow);
|
||||
|
||||
if i % 1000 == 0 {
|
||||
eprintln!("{i} ops processed");
|
||||
//eprintln!("stats: {:?}", tree_writer.get_statistics());
|
||||
//test_iter(&tree_writer, &shadow);
|
||||
}
|
||||
}
|
||||
|
||||
do_random_ops(10000, 1000, 0.75, &mut writer, &mut shadow, &mut rng);
|
||||
writer.grow(1500).unwrap();
|
||||
|
||||
for i in 0..10000 {
|
||||
let key: TestKey = ((rng.next_u32() % 1500) as u128).into();
|
||||
|
||||
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
|
||||
|
||||
apply_op(&op, &writer, &mut shadow);
|
||||
|
||||
if i % 1000 == 0 {
|
||||
eprintln!("{i} ops processed");
|
||||
//eprintln!("stats: {:?}", tree_writer.get_statistics());
|
||||
//test_iter(&tree_writer, &shadow);
|
||||
}
|
||||
}
|
||||
do_random_ops(10000, 1500, 0.75, &mut writer, &mut shadow, &mut rng);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_shrink() {
|
||||
let mut writer = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
1500, 2000, "test_shrink"
|
||||
).attach_writer();
|
||||
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();
|
||||
let mut rng = rand::rng();
|
||||
|
||||
do_random_ops(10000, 1500, 0.75, &mut writer, &mut shadow, &mut rng);
|
||||
do_shrink(&mut writer, &mut shadow, 1500, 1000);
|
||||
do_deletes(500, &mut writer, &mut shadow);
|
||||
do_random_ops(10000, 500, 0.75, &mut writer, &mut shadow, &mut rng);
|
||||
assert!(writer.get_num_buckets_in_use() <= 1000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_shrink_grow_seq() {
|
||||
let mut writer = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
1000, 20000, "test_grow_seq"
|
||||
).attach_writer();
|
||||
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();
|
||||
let mut rng = rand::rng();
|
||||
|
||||
do_random_ops(500, 1000, 0.1, &mut writer, &mut shadow, &mut rng);
|
||||
eprintln!("Shrinking to 750");
|
||||
do_shrink(&mut writer, &mut shadow, 1000, 750);
|
||||
do_random_ops(200, 1000, 0.5, &mut writer, &mut shadow, &mut rng);
|
||||
eprintln!("Growing to 1500");
|
||||
writer.grow(1500).unwrap();
|
||||
do_random_ops(600, 1500, 0.1, &mut writer, &mut shadow, &mut rng);
|
||||
eprintln!("Shrinking to 200");
|
||||
do_shrink(&mut writer, &mut shadow, 1500, 200);
|
||||
do_deletes(100, &mut writer, &mut shadow);
|
||||
do_random_ops(50, 1500, 0.25, &mut writer, &mut shadow, &mut rng);
|
||||
eprintln!("Growing to 10k");
|
||||
writer.grow(10000).unwrap();
|
||||
do_random_ops(10000, 5000, 0.25, &mut writer, &mut shadow, &mut rng);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bucket_ops() {
|
||||
let mut writer = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
1000, 1200, "test_bucket_ops"
|
||||
).attach_writer();
|
||||
let hash = writer.get_hash_value(&1.into());
|
||||
match writer.entry_with_hash(1.into(), hash) {
|
||||
Entry::Occupied(mut e) => { e.insert(2); },
|
||||
Entry::Vacant(e) => { e.insert(2).unwrap(); },
|
||||
}
|
||||
assert_eq!(writer.get_num_buckets_in_use(), 1);
|
||||
assert_eq!(writer.get_num_buckets(), 1000);
|
||||
assert_eq!(writer.get_with_hash(&1.into(), hash), Some(&2));
|
||||
match writer.entry_with_hash(1.into(), hash) {
|
||||
Entry::Occupied(e) => {
|
||||
assert_eq!(e._key, 1.into());
|
||||
let pos = e.bucket_pos as usize;
|
||||
assert_eq!(writer.entry_at_bucket(pos).unwrap()._key, 1.into());
|
||||
assert_eq!(writer.get_at_bucket(pos), Some(&(1.into(), 2)));
|
||||
},
|
||||
Entry::Vacant(_) => { panic!("Insert didn't affect entry"); },
|
||||
}
|
||||
writer.remove_with_hash(&1.into(), hash);
|
||||
assert_eq!(writer.get_with_hash(&1.into(), hash), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_shrink_zero() {
|
||||
let mut writer = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
1500, 2000, "test_shrink_zero"
|
||||
).attach_writer();
|
||||
writer.begin_shrink(0);
|
||||
for i in 0..1500 {
|
||||
writer.entry_at_bucket(i).map(|x| x.remove());
|
||||
}
|
||||
writer.finish_shrink().unwrap();
|
||||
assert_eq!(writer.get_num_buckets_in_use(), 0);
|
||||
let hash = writer.get_hash_value(&1.into());
|
||||
let entry = writer.entry_with_hash(1.into(), hash);
|
||||
if let Entry::Vacant(v) = entry {
|
||||
assert!(v.insert(2).is_err());
|
||||
} else {
|
||||
panic!("Somehow got non-vacant entry in empty map.")
|
||||
}
|
||||
writer.grow(50).unwrap();
|
||||
let entry = writer.entry_with_hash(1.into(), hash);
|
||||
if let Entry::Vacant(v) = entry {
|
||||
assert!(v.insert(2).is_ok());
|
||||
} else {
|
||||
panic!("Somehow got non-vacant entry in empty map.")
|
||||
}
|
||||
assert_eq!(writer.get_num_buckets_in_use(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_grow_oom() {
|
||||
let mut writer = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
1500, 2000, "test_grow_oom"
|
||||
).attach_writer();
|
||||
writer.grow(20000).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_shrink_bigger() {
|
||||
let mut writer = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
1500, 2500, "test_shrink_bigger"
|
||||
).attach_writer();
|
||||
writer.begin_shrink(2000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_shrink_early_finish() {
|
||||
let mut writer = HashMapInit::<TestKey, usize>::new_resizeable_named(
|
||||
1500, 2500, "test_shrink_early_finish"
|
||||
).attach_writer();
|
||||
writer.finish_shrink().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_shrink_fixed_size() {
|
||||
let mut area = [MaybeUninit::uninit(); 10000];
|
||||
let init_struct = HashMapInit::<TestKey, usize>::with_fixed(3, &mut area);
|
||||
let mut writer = init_struct.attach_writer();
|
||||
writer.begin_shrink(1);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user