diff --git a/Cargo.lock b/Cargo.lock index bb89c8a92a..77d83c7a71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2421,12 +2421,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foldhash" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" - [[package]] name = "form_urlencoded" version = "1.2.1" @@ -2794,16 +2788,6 @@ version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" -[[package]] -name = "hashbrown" -version = "0.15.4" -source = "git+https://github.com/quantumish/hashbrown.git?rev=6610e6d#6610e6d2b1f288ef7b0709a3efefbc846395dc5e" -dependencies = [ - "allocator-api2", - "equivalent", - "foldhash", -] - [[package]] name = "hashlink" version = "0.9.1" @@ -3863,7 +3847,7 @@ dependencies = [ "prometheus", "rand 0.8.5", "rand_distr 0.4.3", - "twox-hash 1.6.3", + "twox-hash", ] [[package]] @@ -3950,22 +3934,15 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" name = "neon-shmem" version = "0.1.0" dependencies = [ - "ahash", - "criterion", - "foldhash", - "hashbrown 0.15.4", "libc", "lock_api", "nix 0.30.1", "rand 0.9.1", "rand_distr 0.5.1", "rustc-hash 2.1.1", - "seahash", "tempfile", "thiserror 1.0.69", - "twox-hash 2.1.1", "workspace_hack", - "xxhash-rust", ] [[package]] @@ -4559,7 +4536,7 @@ dependencies = [ "tower 0.5.2", "tracing", "tracing-utils", - "twox-hash 1.6.3", + "twox-hash", "url", "utils", "uuid", @@ -4783,7 +4760,7 @@ dependencies = [ "paste", "seq-macro", "thrift", - "twox-hash 1.6.3", + "twox-hash", "zstd", "zstd-sys", ] @@ -5517,7 +5494,7 @@ dependencies = [ "reqwest-tracing", "rsa", "rstest", - "rustc-hash 1.1.0", + "rustc-hash 2.1.1", "rustls 0.23.27", "rustls-native-certs 0.8.0", "rustls-pemfile 2.1.1", @@ -6528,12 +6505,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "621e3680f3e07db4c9c2c3fb07c6223ab2fab2e54bd3c04c3ae037990f428c32" -[[package]] -name = "seahash" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" - [[package]] name = "sec1" version = "0.3.0" @@ -8254,15 +8225,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "twox-hash" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b907da542cbced5261bd3256de1b3a1bf340a3d37f93425a07362a1d687de56" -dependencies = [ - "rand 0.9.1", -] - [[package]] name = "typed-json" version = "0.1.1" @@ -9128,12 +9090,6 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd" -[[package]] -name = "xxhash-rust" -version = "0.8.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" - [[package]] name = "yasna" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index 175328fa46..8e6327a974 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,6 +131,7 @@ jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] } jsonwebtoken = "9" lasso = "0.7" libc = "0.2" +lock_api = "0.4.13" md5 = "0.7.0" measured = { version = "0.0.22", features=["lasso"] } measured-process = { version = "0.0.22" } @@ -167,7 +168,7 @@ reqwest-middleware = "0.4" reqwest-retry = "0.7" routerify = "3" rpds = "0.13" -rustc-hash = "1.1.0" +rustc-hash = "2.1.1" rustls = { version = "0.23.16", default-features = false } rustls-pemfile = "2" rustls-pki-types = "1.11" diff --git a/libs/neon-shmem/Cargo.toml b/libs/neon-shmem/Cargo.toml index 8ce5b52deb..cf3b082613 100644 --- a/libs/neon-shmem/Cargo.toml +++ b/libs/neon-shmem/Cargo.toml @@ -6,27 +6,15 @@ license.workspace = true [dependencies] thiserror.workspace = true -nix.workspace = true +nix.workspace=true workspace_hack = { version = "0.1", path = "../../workspace_hack" } -rustc-hash = { version = "2.1.1" } -rand = "0.9.1" libc.workspace = true -lock_api = "0.4.13" - -[dev-dependencies] -criterion = { workspace = true, features = ["html_reports"] } -rand_distr = "0.5.1" -xxhash-rust = { version = "0.8.15", features = ["xxh3"] } -ahash.workspace = true -twox-hash = { version = "2.1.1" } -seahash = "4.1.0" -hashbrown = { git = "https://github.com/quantumish/hashbrown.git", rev = "6610e6d" } -foldhash = "0.1.5" - +lock_api.workspace = true +rustc-hash.workspace = true [target.'cfg(target_os = "macos")'.dependencies] tempfile = "3.14.0" -[[bench]] -name = "hmap_resize" -harness = false +[dev-dependencies] +rand = "0.9" +rand_distr = "0.5.1" \ No newline at end of file diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index 84c2be3637..58726b9ba3 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -13,6 +13,8 @@ //! This map is resizable (if initialized on top of a [`ShmemHandle`]). Both growing and shrinking happen //! in-place and are at a high level achieved by expanding/reducing the bucket array and rebuilding the //! dictionary by rehashing all keys. +//! +//! Concurrency is managed very simply: the entire map is guarded by one shared-memory RwLock. use std::hash::{BuildHasher, Hash}; use std::mem::MaybeUninit; @@ -29,6 +31,19 @@ mod tests; use core::{Bucket, CoreHashMap, INVALID_POS}; use entry::{Entry, OccupiedEntry, PrevPos, VacantEntry}; +use thiserror::Error; + +/// Error type for a hashmap shrink operation. +#[derive(Error, Debug)] +pub enum HashMapShrinkError { + /// There was an error encountered while resizing the memory area. + #[error("shmem resize failed: {0}")] + ResizeError(shmem::Error), + /// Occupied entries in to-be-shrunk space were encountered beginning at the given index. + #[error("occupied entry in deallocated space found at {0}")] + RemainingEntries(usize), +} + /// This represents a hash table that (possibly) lives in shared memory. /// If a new process is launched with fork(), the child process inherits /// this struct. @@ -116,8 +131,8 @@ impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> { }; let hashmap = CoreHashMap::new(buckets, dictionary); - let lock = RwLock::from_raw(PthreadRwLock::new(raw_lock_ptr.cast()), hashmap); unsafe { + let lock = RwLock::from_raw(PthreadRwLock::new(raw_lock_ptr.cast()), hashmap); std::ptr::write(shared_ptr, lock); } @@ -140,6 +155,9 @@ impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> { } /// Initialize a table for reading. Currently identical to [`HashMapInit::attach_writer`]. + /// + /// This is a holdover from a previous implementation and is being kept around for + /// backwards compatibility reasons. pub fn attach_reader(self) -> HashMapAccess<'a, K, V, S> { self.attach_writer() } @@ -153,8 +171,8 @@ impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> { /// /// [`libc::pthread_rwlock_t`] /// [`HashMapShared`] -/// [buckets] -/// [dictionary] +/// buckets +/// dictionary /// /// In between the above parts, there can be padding bytes to align the parts correctly. type HashMapShared<'a, K, V> = RwLock>; @@ -279,6 +297,9 @@ where } /// Get a reference to the entry containing a key. + /// + /// NB: THis takes a write lock as there's no way to distinguish whether the intention + /// is to use the entry for reading or for writing in advance. pub fn entry(&self, key: K) -> Entry<'a, '_, K, V> { let hash = self.get_hash_value(&key); self.entry_with_hash(key, hash) @@ -286,7 +307,7 @@ where /// Remove a key given its hash. Returns the associated value if it existed. pub fn remove(&self, key: &K) -> Option { - let hash = self.get_hash_value(&key); + let hash = self.get_hash_value(key); match self.entry_with_hash(key.clone(), hash) { Entry::Occupied(e) => Some(e.remove()), Entry::Vacant(_) => None, @@ -324,7 +345,7 @@ where Some((key, _)) => Some(OccupiedEntry { _key: key.clone(), bucket_pos: pos as u32, - prev_pos: entry::PrevPos::Unknown(self.get_hash_value(&key)), + prev_pos: entry::PrevPos::Unknown(self.get_hash_value(key)), map, }), _ => None, @@ -519,12 +540,7 @@ where /// The following cases result in a panic: /// - Calling this function on a map initialized with [`HashMapInit::with_fixed`]. /// - Calling this function on a map when no shrink operation is in progress. - /// - Calling this function on a map with `shrink_mode` set to [`HashMapShrinkMode::Remap`] and - /// there are more buckets in use than the value returned by [`HashMapAccess::shrink_goal`]. - /// - /// # Errors - /// Returns an [`shmem::Error`] if any errors occur resizing the memory region. - pub fn finish_shrink(&self) -> Result<(), shmem::Error> { + pub fn finish_shrink(&self) -> Result<(), HashMapShrinkError> { let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write(); assert!( map.alloc_limit != INVALID_POS, @@ -543,10 +559,8 @@ where ); for i in (num_buckets as usize)..map.buckets.len() { - if let Some((k, v)) = map.buckets[i].inner.take() { - // alloc_bucket increases count, so need to decrease since we're just moving - map.buckets_in_use -= 1; - map.alloc_bucket(k, v).unwrap(); + if map.buckets[i].inner.is_some() { + return Err(HashMapShrinkError::RemainingEntries(i)); } } @@ -556,7 +570,9 @@ where .expect("shrink called on a fixed-size hash table"); let size_bytes = HashMapInit::::estimate_size(num_buckets); - shmem_handle.set_size(size_bytes)?; + if let Err(e) = shmem_handle.set_size(size_bytes) { + return Err(HashMapShrinkError::ResizeError(e)); + } let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) }; let buckets_ptr = map.buckets.as_mut_ptr(); self.rehash_dict(&mut map, buckets_ptr, end_ptr, num_buckets, num_buckets); diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index 013eb9a09c..4665c36adb 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -29,14 +29,11 @@ pub(crate) struct CoreHashMap<'a, K, V> { pub(crate) alloc_limit: u32, /// The number of currently occupied buckets. pub(crate) buckets_in_use: u32, - // pub(crate) lock: libc::pthread_mutex_t, - // Unclear what the purpose of this is. - pub(crate) _user_list_head: u32, } /// Error for when there are no empty buckets left but one is needed. #[derive(Debug, PartialEq)] -pub struct FullError(); +pub struct FullError; impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { const FILL_FACTOR: f32 = 0.60; @@ -88,7 +85,6 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { buckets, free_head: 0, buckets_in_use: 0, - _user_list_head: INVALID_POS, alloc_limit: INVALID_POS, } } @@ -149,7 +145,7 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { pos = bucket.next; } if pos == INVALID_POS { - return Err(FullError()); + return Err(FullError); } // Repair the freelist. diff --git a/libs/neon-shmem/src/hash/entry.rs b/libs/neon-shmem/src/hash/entry.rs index bf2f63fe9c..560a20db1d 100644 --- a/libs/neon-shmem/src/hash/entry.rs +++ b/libs/neon-shmem/src/hash/entry.rs @@ -61,10 +61,6 @@ impl OccupiedEntry<'_, '_, K, V> { /// /// This may result in multiple bucket accesses if the entry was obtained by index as the /// previous chain entry needs to be discovered in this case. - /// - /// # Panics - /// Panics if the `prev_pos` field is equal to [`PrevPos::Unknown`]. In practice, this means - /// the entry was obtained via calling something like [`CoreHashMap::entry_at_bucket`]. pub fn remove(mut self) -> V { // If this bucket was queried by index, go ahead and follow its chain from the start. let prev = if let PrevPos::Unknown(hash) = self.prev_pos { @@ -90,7 +86,6 @@ impl OccupiedEntry<'_, '_, K, V> { self.map.dictionary[dict_pos as usize] = bucket.next; } PrevPos::Chained(bucket_pos) => { - // println!("we think prev of {} is {bucket_pos}", self.bucket_pos); self.map.buckets[bucket_pos as usize].next = bucket.next; } _ => unreachable!(), @@ -125,9 +120,6 @@ impl<'b, K: Clone + Hash + Eq, V> VacantEntry<'_, 'b, K, V> { /// Will return [`FullError`] if there are no unoccupied buckets in the map. pub fn insert(mut self, value: V) -> Result, FullError> { let pos = self.map.alloc_bucket(self.key, value)?; - if pos == INVALID_POS { - return Err(FullError()); - } self.map.buckets[pos as usize].next = self.map.dictionary[self.dict_pos as usize]; self.map.dictionary[self.dict_pos as usize] = pos; diff --git a/libs/neon-shmem/src/hash/tests.rs b/libs/neon-shmem/src/hash/tests.rs index aee47a0b3e..92233e8140 100644 --- a/libs/neon-shmem/src/hash/tests.rs +++ b/libs/neon-shmem/src/hash/tests.rs @@ -164,16 +164,16 @@ fn do_deletes( fn do_shrink( writer: &mut HashMapAccess, shadow: &mut BTreeMap, + from: u32, to: u32, ) { assert!(writer.shrink_goal().is_none()); writer.begin_shrink(to); assert_eq!(writer.shrink_goal(), Some(to as usize)); - while writer.get_num_buckets_in_use() > to as usize { - let (k, _) = shadow.pop_first().unwrap(); - let entry = writer.entry(k); - if let Entry::Occupied(e) = entry { - e.remove(); + for i in to..from { + if let Some(entry) = writer.entry_at_bucket(i as usize) { + shadow.remove(&entry._key); + entry.remove(); } } let old_usage = writer.get_num_buckets_in_use(); @@ -298,7 +298,7 @@ fn test_shrink() { let mut rng = rand::rng(); do_random_ops(10000, 1500, 0.75, &mut writer, &mut shadow, &mut rng); - do_shrink(&mut writer, &mut shadow, 1000); + do_shrink(&mut writer, &mut shadow, 1500, 1000); assert_eq!(writer.get_num_buckets(), 1000); do_deletes(500, &mut writer, &mut shadow); do_random_ops(10000, 500, 0.75, &mut writer, &mut shadow, &mut rng); @@ -315,7 +315,7 @@ fn test_shrink_grow_seq() { do_random_ops(500, 1000, 0.1, &mut writer, &mut shadow, &mut rng); eprintln!("Shrinking to 750"); - do_shrink(&mut writer, &mut shadow, 750); + do_shrink(&mut writer, &mut shadow, 1000, 750); do_random_ops(200, 1000, 0.5, &mut writer, &mut shadow, &mut rng); eprintln!("Growing to 1500"); writer.grow(1500).unwrap(); @@ -324,7 +324,7 @@ fn test_shrink_grow_seq() { while shadow.len() > 100 { do_deletes(1, &mut writer, &mut shadow); } - do_shrink(&mut writer, &mut shadow, 200); + do_shrink(&mut writer, &mut shadow, 1500, 200); do_random_ops(50, 1500, 0.25, &mut writer, &mut shadow, &mut rng); eprintln!("Growing to 10k"); writer.grow(10000).unwrap(); @@ -349,8 +349,7 @@ fn test_bucket_ops() { let pos = match writer.entry(1.into()) { Entry::Occupied(e) => { assert_eq!(e._key, 1.into()); - let pos = e.bucket_pos as usize; - pos + e.bucket_pos as usize } Entry::Vacant(_) => { panic!("Insert didn't affect entry"); diff --git a/libs/neon-shmem/src/sync.rs b/libs/neon-shmem/src/sync.rs index 5a296b4047..95719778ba 100644 --- a/libs/neon-shmem/src/sync.rs +++ b/libs/neon-shmem/src/sync.rs @@ -6,7 +6,7 @@ use std::ptr::NonNull; use nix::errno::Errno; pub type RwLock = lock_api::RwLock; -pub(crate) type RwLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, PthreadRwLock, T>; +pub type RwLockReadGuard<'a, T> = lock_api::RwLockReadGuard<'a, PthreadRwLock, T>; pub type RwLockWriteGuard<'a, T> = lock_api::RwLockWriteGuard<'a, PthreadRwLock, T>; pub type ValueReadGuard<'a, T> = lock_api::MappedRwLockReadGuard<'a, PthreadRwLock, T>; pub type ValueWriteGuard<'a, T> = lock_api::MappedRwLockWriteGuard<'a, PthreadRwLock, T>; @@ -14,19 +14,34 @@ pub type ValueWriteGuard<'a, T> = lock_api::MappedRwLockWriteGuard<'a, PthreadRw /// Shared memory read-write lock. pub struct PthreadRwLock(Option>); +/// Simple macro that calls a function in the libc namespace and panics if return value is nonzero. +macro_rules! libc_checked { + ($fn_name:ident ( $($arg:expr),* )) => {{ + let res = libc::$fn_name($($arg),*); + if res != 0 { + panic!("{} failed with {}", stringify!($fn_name), Errno::from_raw(res)); + } + }}; +} + impl PthreadRwLock { - pub fn new(lock: *mut libc::pthread_rwlock_t) -> Self { + /// Creates a new `PthreadRwLock` on top of a pointer to a pthread rwlock. + /// + /// # Safety + /// `lock` must be non-null. Every unsafe operation will panic in the event of an error. + pub unsafe fn new(lock: *mut libc::pthread_rwlock_t) -> Self { unsafe { let mut attrs = MaybeUninit::uninit(); - // Ignoring return value here - only possible error is OOM. - libc::pthread_rwlockattr_init(attrs.as_mut_ptr()); - libc::pthread_rwlockattr_setpshared(attrs.as_mut_ptr(), libc::PTHREAD_PROCESS_SHARED); - // TODO(quantumish): worth making this function return Result? - libc::pthread_rwlock_init(lock, attrs.as_mut_ptr()); + libc_checked!(pthread_rwlockattr_init(attrs.as_mut_ptr())); + libc_checked!(pthread_rwlockattr_setpshared( + attrs.as_mut_ptr(), + libc::PTHREAD_PROCESS_SHARED + )); + libc_checked!(pthread_rwlock_init(lock, attrs.as_mut_ptr())); // Safety: POSIX specifies that "any function affecting the attributes // object (including destruction) shall not affect any previously // initialized read-write locks". - libc::pthread_rwlockattr_destroy(attrs.as_mut_ptr()); + libc_checked!(pthread_rwlockattr_destroy(attrs.as_mut_ptr())); Self(Some(NonNull::new_unchecked(lock))) } } @@ -34,7 +49,7 @@ impl PthreadRwLock { fn inner(&self) -> NonNull { match self.0 { None => { - panic!("PthreadRwLock constructed badly - something likely used RawMutex::INIT") + panic!("PthreadRwLock constructed badly - something likely used RawRwLock::INIT") } Some(x) => x, } @@ -45,31 +60,16 @@ unsafe impl lock_api::RawRwLock for PthreadRwLock { type GuardMarker = lock_api::GuardSend; const INIT: Self = Self(None); - fn lock_shared(&self) { - unsafe { - let res = libc::pthread_rwlock_rdlock(self.inner().as_ptr()); - if res != 0 { - panic!("rdlock failed with {}", Errno::from_raw(res)); - } - } - } - fn try_lock_shared(&self) -> bool { unsafe { let res = libc::pthread_rwlock_tryrdlock(self.inner().as_ptr()); match res { 0 => true, libc::EAGAIN => false, - _ => panic!("try_rdlock failed with {}", Errno::from_raw(res)), - } - } - } - - fn lock_exclusive(&self) { - unsafe { - let res = libc::pthread_rwlock_wrlock(self.inner().as_ptr()); - if res != 0 { - panic!("wrlock failed with {}", Errno::from_raw(res)); + _ => panic!( + "pthread_rwlock_tryrdlock failed with {}", + Errno::from_raw(res) + ), } } } @@ -85,20 +85,27 @@ unsafe impl lock_api::RawRwLock for PthreadRwLock { } } - unsafe fn unlock_exclusive(&self) { + fn lock_shared(&self) { unsafe { - let res = libc::pthread_rwlock_unlock(self.inner().as_ptr()); - if res != 0 { - panic!("unlock failed with {}", Errno::from_raw(res)); - } + libc_checked!(pthread_rwlock_rdlock(self.inner().as_ptr())); } } + + fn lock_exclusive(&self) { + unsafe { + libc_checked!(pthread_rwlock_wrlock(self.inner().as_ptr())); + } + } + + unsafe fn unlock_exclusive(&self) { + unsafe { + libc_checked!(pthread_rwlock_unlock(self.inner().as_ptr())); + } + } + unsafe fn unlock_shared(&self) { unsafe { - let res = libc::pthread_rwlock_unlock(self.inner().as_ptr()); - if res != 0 { - panic!("unlock failed with {}", Errno::from_raw(res)); - } + libc_checked!(pthread_rwlock_unlock(self.inner().as_ptr())); } } }