diff --git a/Cargo.lock b/Cargo.lock index bafcaea594..2f50f11cee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3843,9 +3843,11 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" name = "neon-shmem" version = "0.1.0" dependencies = [ + "criterion", "nix 0.30.1", "rand 0.9.1", "rand_distr 0.5.1", + "rustc-hash 1.1.0", "tempfile", "thiserror 1.0.69", "workspace_hack", diff --git a/libs/neon-shmem/Cargo.toml b/libs/neon-shmem/Cargo.toml index de65f3e8cc..284a19d55d 100644 --- a/libs/neon-shmem/Cargo.toml +++ b/libs/neon-shmem/Cargo.toml @@ -8,10 +8,22 @@ license.workspace = true thiserror.workspace = true nix.workspace = true workspace_hack = { version = "0.1", path = "../../workspace_hack" } +rustc-hash = { version = "2.1.1" } [dev-dependencies] +criterion = { workspace = true, features = ["html_reports"] } rand = "0.9.1" rand_distr = "0.5.1" +xxhash-rust = { version = "0.8.15", features = ["xxh3"] } +ahash.workspace = true [target.'cfg(target_os = "macos")'.dependencies] tempfile = "3.14.0" + +[[bench]] +name = "hmap_resize" +harness = false + +[[bin]] +name = "hmap_test" +path = "main.rs" diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index f173f42a6d..364787e2b7 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -8,9 +8,11 @@ //! [ ] Resizable use std::fmt::Debug; -use std::hash::{DefaultHasher, Hash, Hasher}; +use std::hash::{Hash, Hasher, BuildHasher}; use std::mem::MaybeUninit; +use rustc_hash::FxBuildHasher; + use crate::shmem::ShmemHandle; mod core; @@ -25,29 +27,32 @@ use entry::{Entry, OccupiedEntry, PrevPos}; #[derive(Debug)] pub struct OutOfMemoryError(); -pub struct HashMapInit<'a, K, V> { +pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> { // Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle. shmem_handle: Option, shared_ptr: *mut HashMapShared<'a, K, V>, + hasher: S, } -pub struct HashMapAccess<'a, K, V> { +pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> { shmem_handle: Option, shared_ptr: *mut HashMapShared<'a, K, V>, + hasher: S, } -unsafe impl<'a, K: Sync, V: Sync> Sync for HashMapAccess<'a, K, V> {} -unsafe impl<'a, K: Send, V: Send> Send for HashMapAccess<'a, K, V> {} +unsafe impl<'a, K: Sync, V: Sync, S> Sync for HashMapAccess<'a, K, V, S> {} +unsafe impl<'a, K: Send, V: Send, S> Send for HashMapAccess<'a, K, V, S> {} -impl<'a, K, V> HashMapInit<'a, K, V> { - pub fn attach_writer(self) -> HashMapAccess<'a, K, V> { +impl<'a, K, V, S> HashMapInit<'a, K, V, S> { + pub fn attach_writer(self) -> HashMapAccess<'a, K, V, S> { HashMapAccess { shmem_handle: self.shmem_handle, shared_ptr: self.shared_ptr, + hasher: self.hasher, } } - pub fn attach_reader(self) -> HashMapAccess<'a, K, V> { + pub fn attach_reader(self) -> HashMapAccess<'a, K, V, S> { // no difference to attach_writer currently self.attach_writer() } @@ -65,42 +70,60 @@ impl<'a, K, V> HashMapInit<'a, K, V> { /// /// In between the above parts, there can be padding bytes to align the parts correctly. struct HashMapShared<'a, K, V> { - inner: CoreHashMap<'a, K, V>, + inner: CoreHashMap<'a, K, V> } -impl<'a, K, V> HashMapInit<'a, K, V> +impl<'a, K, V> HashMapInit<'a, K, V, rustc_hash::FxBuildHasher> where - K: Clone + Hash + Eq, + K: Clone + Hash + Eq +{ + pub fn init_in_fixed_area( + num_buckets: u32, + area: &'a mut [MaybeUninit], + ) -> HashMapInit<'a, K, V> { + Self::init_in_fixed_area_with_hasher(num_buckets, area, rustc_hash::FxBuildHasher::default()) + } + + /// Initialize a new hash map in the given shared memory area + pub fn init_in_shmem(num_buckets: u32, shmem: ShmemHandle) -> HashMapInit<'a, K, V> { + Self::init_in_shmem_with_hasher(num_buckets, shmem, rustc_hash::FxBuildHasher::default()) + } +} + +impl<'a, K, V, S: BuildHasher> HashMapInit<'a, K, V, S> +where + K: Clone + Hash + Eq { pub fn estimate_size(num_buckets: u32) -> usize { // add some margin to cover alignment etc. CoreHashMap::::estimate_size(num_buckets) + size_of::>() + 1000 } - - pub fn init_in_fixed_area( - num_buckets: u32, - area: &'a mut [MaybeUninit], - ) -> HashMapInit<'a, K, V> { - Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len()) - } - - /// Initialize a new hash map in the given shared memory area - pub fn init_in_shmem(num_buckets: u32, mut shmem: ShmemHandle) -> HashMapInit<'a, K, V> { + + pub fn init_in_shmem_with_hasher(num_buckets: u32, mut shmem: ShmemHandle, hasher: S) -> HashMapInit<'a, K, V, S> { let size = Self::estimate_size(num_buckets); shmem .set_size(size) .expect("could not resize shared memory area"); let ptr = unsafe { shmem.data_ptr.as_mut() }; - Self::init_common(num_buckets, Some(shmem), ptr, size) + Self::init_common(num_buckets, Some(shmem), ptr, size, hasher) } + pub fn init_in_fixed_area_with_hasher( + num_buckets: u32, + area: &'a mut [MaybeUninit], + hasher: S, + ) -> HashMapInit<'a, K, V, S> { + Self::init_common(num_buckets, None, area.as_mut_ptr().cast(), area.len(), hasher) + } + fn init_common( num_buckets: u32, shmem_handle: Option, area_ptr: *mut u8, area_len: usize, - ) -> HashMapInit<'a, K, V> { + hasher: S, + ) -> HashMapInit<'a, K, V, S> { // carve out the HashMapShared struct from the area. let mut ptr: *mut u8 = area_ptr; let end_ptr: *mut u8 = unsafe { area_ptr.add(area_len) }; @@ -133,18 +156,17 @@ where HashMapInit { shmem_handle, shared_ptr, + hasher, } } } -impl<'a, K, V> HashMapAccess<'a, K, V> +impl<'a, K, V, S: BuildHasher> HashMapAccess<'a, K, V, S> where K: Clone + Hash + Eq, { pub fn get_hash_value(&self, key: &K) -> u64 { - let mut hasher = DefaultHasher::new(); - key.hash(&mut hasher); - hasher.finish() + self.hasher.hash_one(key) } pub fn get_with_hash<'e>(&'e self, key: &K, hash: u64) -> Option<&'e V> { @@ -210,6 +232,12 @@ where map.inner.buckets_in_use as usize } + pub fn clear(&mut self) { + let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); + let inner = &mut map.inner; + inner.clear() + } + /// Helper function that abstracts the common logic between growing and shrinking. /// The only significant difference in the rehashing step is how many buckets to rehash. fn rehash_dict( @@ -243,10 +271,7 @@ where continue; } - let mut hasher = DefaultHasher::new(); - buckets[i].inner.as_ref().unwrap().0.hash(&mut hasher); - let hash = hasher.finish(); - + let hash = self.hasher.hash_one(&buckets[i].inner.as_ref().unwrap().0); let pos: usize = (hash % dictionary.len() as u64) as usize; buckets[i].next = dictionary[pos]; dictionary[pos] = i as u32; @@ -256,6 +281,23 @@ where inner.dictionary = dictionary; inner.buckets = buckets; } + + /// Rehash the map. Intended for benchmarking only. + pub fn shuffle(&mut self) { + let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); + let inner = &mut map.inner; + + let shmem_handle = self + .shmem_handle + .as_ref() + .expect("TODO(quantumish): make shuffle work w/ fixed-size table"); + let num_buckets = inner.get_num_buckets() as u32; + let size_bytes = HashMapInit::::estimate_size(num_buckets); + let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) }; + let buckets_ptr = inner.buckets.as_mut_ptr(); + self.rehash_dict(inner, buckets_ptr, end_ptr, num_buckets, num_buckets); + } + /// Grow /// @@ -278,7 +320,7 @@ where .as_ref() .expect("grow called on a fixed-size hash table"); - let size_bytes = HashMapInit::::estimate_size(num_buckets); + let size_bytes = HashMapInit::::estimate_size(num_buckets); shmem_handle.set_size(size_bytes)?; let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) }; @@ -371,7 +413,7 @@ where .as_ref() .expect("shrink called on a fixed-size hash table"); - let size_bytes = HashMapInit::::estimate_size(num_buckets); + let size_bytes = HashMapInit::::estimate_size(num_buckets); shmem_handle.set_size(size_bytes)?; let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) }; let buckets_ptr = inner.buckets.as_mut_ptr(); diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index 80f41fd8d4..049600e538 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -50,7 +50,7 @@ where as usize; size - } + } pub fn new( buckets: &'a mut [MaybeUninit>], @@ -160,6 +160,33 @@ where self.alloc_limit != INVALID_POS } + + // TODO(quantumish): How does this interact with an ongoing shrink? + pub fn clear(&mut self) { + for i in 0..self.buckets.len() { + self.buckets[i] = Bucket { + next: if i < self.buckets.len() - 1 { + i as u32 + 1 + } else { + INVALID_POS + }, + prev: if i > 0 { + PrevPos::Chained(i as u32 - 1) + } else { + PrevPos::First(INVALID_POS) + }, + inner: None, + } + } + + for i in 0..self.dictionary.len() { + self.dictionary[i] = INVALID_POS; + } + + self.buckets_in_use = 0; + self.alloc_limit = INVALID_POS; + } + pub fn entry_at_bucket(&mut self, pos: usize) -> Option> { if pos >= self.buckets.len() { return None;