From b6b122e07b3e20f57684046c7202a2ed5daab821 Mon Sep 17 00:00:00 2001 From: David Freifeld Date: Mon, 16 Jun 2025 10:20:30 -0700 Subject: [PATCH] nw: add shrinking and deletion skeletons --- libs/neon-shmem/src/hash.rs | 48 ++++++++++++++++++++++++++++++++ libs/neon-shmem/src/hash/core.rs | 45 ++++++++++++++++++++++++------ 2 files changed, 84 insertions(+), 9 deletions(-) diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index ae53e2ec41..9b1c1cee89 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -292,6 +292,54 @@ where Ok(()) } + fn begin_shrink(&mut self, num_buckets: u32) { + let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); + if num_buckets < map.inner.get_num_buckets() as u32 { + panic!("shrink called with a larger number of buckets"); + } + map.inner.alloc_limit = num_buckets; + } + + fn finish_shrink(&mut self) -> Result<(), crate::shmem::Error> { + let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); + let inner = &mut map.inner; + if !inner.is_shrinking() { + panic!("called finish_shrink when no shrink is in progress"); + } + + let new_num_buckets = inner.alloc_limit; + + if inner.get_num_buckets() == new_num_buckets as usize { + return Ok(()); + } + + for b in &inner.buckets[new_num_buckets as usize..] { + if b.inner.is_some() { + // TODO(quantumish) Do we want to treat this as a violation of an invariant + // or a legitimate error the caller can run into? Originally I thought this + // could return something like a UnevictedError(index) as soon as it runs + // into something (that way a caller could clear their soon-to-be-shrinked + // buckets by repeatedly trying to call `finish_shrink`). + // + // Would require making a wider error type enum with this and shmem errors. + panic!("unevicted entries in shrinked space") + } + } + + let shmem_handle = self + .shmem_handle + .as_ref() + .expect("shrink called on a fixed-size hash table"); + + let size_bytes = HashMapInit::::estimate_size(new_num_buckets); + shmem_handle.set_size(size_bytes)?; + let end_ptr: *mut u8 = unsafe { shmem_handle.data_ptr.as_ptr().add(size_bytes) }; + + let buckets_ptr = inner.buckets.as_mut_ptr(); + + Ok(()) + } + // TODO: Shrinking is a multi-step process that requires co-operation from the caller // // 1. The caller must first call begin_shrink(). That forbids allocation of higher-numbered diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index f4ff8ed2c4..eb60e21bad 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -22,6 +22,7 @@ pub(crate) struct CoreHashMap<'a, K, V> { pub(crate) free_head: u32, pub(crate) _user_list_head: u32, + pub(crate) alloc_limit: u32, // metrics pub(crate) buckets_in_use: u32, @@ -83,6 +84,7 @@ where free_head: 0, buckets_in_use: 0, _user_list_head: INVALID_POS, + alloc_limit: INVALID_POS, } } @@ -147,25 +149,50 @@ where self.buckets.len() } + pub fn is_shrinking(&self) -> bool { + self.alloc_limit != INVALID_POS + } + pub fn entry_at_bucket(&mut self, pos: usize) -> Option> { if pos >= self.buckets.len() { return None; } - todo!() - //self.buckets[pos].inner.as_ref() + let entry = self.buckets[pos].inner.as_ref(); + if entry.is_none() { + return None; + } + + let (key, _) = entry.unwrap(); + Some(OccupiedEntry { + _key: key.clone(), // TODO(quantumish): clone unavoidable? + bucket_pos: pos as u32, + map: self, + prev_pos: todo!(), // TODO(quantumish): possibly needs O(n) traversals to rediscover - costly! + }) } pub(crate) fn alloc_bucket(&mut self, key: K, value: V) -> Result { - let pos = self.free_head; - if pos == INVALID_POS { - return Err(FullError()); - } + let mut pos = self.free_head; - let bucket = &mut self.buckets[pos as usize]; - self.free_head = bucket.next; - self.buckets_in_use += 1; + // TODO(quantumish): relies on INVALID_POS being u32::MAX by default! + // instead add a clause `pos != INVALID_POS`? + let mut prev = PrevPos::First(self.free_head); + while pos < self.alloc_limit { + if pos == INVALID_POS { + return Err(FullError()); + } + let bucket = &mut self.buckets[pos as usize]; + prev = PrevPos::Chained(pos); + pos = bucket.next; + } + let bucket = &mut self.buckets[pos as usize]; + match prev { + PrevPos::First(_) => self.free_head = bucket.next, + PrevPos::Chained(p) => self.buckets[p].next = bucket.next, + } + self.buckets_in_use += 1; bucket.next = INVALID_POS; bucket.inner = Some((key, value));