From ae740ca1bb1a997e7768b940c384013ec3469d4b Mon Sep 17 00:00:00 2001 From: David Freifeld Date: Tue, 24 Jun 2025 16:27:17 -0700 Subject: [PATCH] Document hashmap implementation, fix `get_bucket_for_value` Previously, `get_bucket_for_value` incorrectly divided by the size of `V` to get the bucket index. Now it divides by the size of `Bucket`. --- libs/neon-shmem/Cargo.toml | 4 -- libs/neon-shmem/src/hash.rs | 92 +++++++++++++++++-------------- libs/neon-shmem/src/hash/core.rs | 29 ++++++---- libs/neon-shmem/src/hash/entry.rs | 26 ++++----- libs/neon-shmem/src/hash/tests.rs | 7 ++- 5 files changed, 86 insertions(+), 72 deletions(-) diff --git a/libs/neon-shmem/Cargo.toml b/libs/neon-shmem/Cargo.toml index 284a19d55d..bf14eb2e83 100644 --- a/libs/neon-shmem/Cargo.toml +++ b/libs/neon-shmem/Cargo.toml @@ -23,7 +23,3 @@ tempfile = "3.14.0" [[bench]] name = "hmap_resize" harness = false - -[[bin]] -name = "hmap_test" -path = "main.rs" diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index 404a7ade50..032c1bd21f 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -1,18 +1,22 @@ -//! Hash table implementation on top of 'shmem' +//! Resizable hash table implementation on top of byte-level storage (either `shmem` or fixed byte array). //! -//! Features required in the long run by the communicator project: +//! This hash table has two major components: the bucket array and the dictionary. Each bucket within the +//! bucket array contains a Option<(K, V)> and an index of another bucket. In this way there is both an +//! implicit freelist within the bucket array (None buckets point to other None entries) and various hash +//! chains within the bucket array (a Some bucket will point to other Some buckets that had the same hash). //! -//! [X] Accessible from both Postgres processes and rust threads in the communicator process -//! [X] Low latency -//! [ ] Scalable to lots of concurrent accesses (currently relies on caller for locking) -//! [ ] Resizable +//! Buckets are never moved unless they are within a region that is being shrunk, and so the actual hash- +//! dependent component is done with the dictionary. When a new key is inserted into the map, a position +//! within the dictionary is decided based on its hash, the data is inserted into an empty bucket based +//! off of the freelist, and then the index of said bucket is placed in the dictionary. +//! +//! This map is resizable (if initialized on top of a `ShmemHandle`). Both growing and shrinking happen +//! in-place and are at a high level achieved by expanding/reducing the bucket array and rebuilding the +//! dictionary by rehashing all keys. -use std::fmt::Debug; -use std::hash::{Hash, Hasher, BuildHasher}; +use std::hash::{Hash, BuildHasher}; use std::mem::MaybeUninit; -use rustc_hash::FxBuildHasher; - use crate::shmem::ShmemHandle; mod core; @@ -21,12 +25,11 @@ pub mod entry; #[cfg(test)] mod tests; -use core::{CoreHashMap, INVALID_POS}; +use core::{Bucket, CoreHashMap, INVALID_POS}; use entry::{Entry, OccupiedEntry}; - +/// Builder for a `HashMapAccess`. pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> { - // Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle. shmem_handle: Option, shared_ptr: *mut HashMapShared<'a, K, V>, shared_size: usize, @@ -34,6 +37,7 @@ pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> { num_buckets: u32, } +/// Accessor for a hash table. pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> { shmem_handle: Option, shared_ptr: *mut HashMapShared<'a, K, V>, @@ -43,16 +47,18 @@ pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> { unsafe impl<'a, K: Sync, V: Sync, S> Sync for HashMapAccess<'a, K, V, S> {} unsafe impl<'a, K: Send, V: Send, S> Send for HashMapAccess<'a, K, V, S> {} -impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> { +impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> { pub fn with_hasher(self, hasher: S) -> HashMapInit<'a, K, V, S> { Self { hasher, ..self } } - + + /// Loosely (over)estimate the size needed to store a hash table with `num_buckets` buckets. pub fn estimate_size(num_buckets: u32) -> usize { // add some margin to cover alignment etc. CoreHashMap::::estimate_size(num_buckets) + size_of::>() + 1000 } - + + /// Initialize a table for writing. pub fn attach_writer(self) -> HashMapAccess<'a, K, V, S> { // carve out the HashMapShared struct from the area. let mut ptr: *mut u8 = self.shared_ptr.cast(); @@ -90,13 +96,13 @@ impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> { } } + /// Initialize a table for reading. Currently identical to `attach_writer`. pub fn attach_reader(self) -> HashMapAccess<'a, K, V, S> { - // no difference to attach_writer currently self.attach_writer() } } -/// This is stored in the shared memory area +/// Hash table data that is actually stored in the shared memory area. /// /// NOTE: We carve out the parts from a contiguous chunk. Growing and shrinking the hash table /// relies on the memory layout! The data structures are laid out in the contiguous shared memory @@ -115,6 +121,7 @@ impl<'a, K, V> HashMapInit<'a, K, V, rustc_hash::FxBuildHasher> where K: Clone + Hash + Eq { + /// Place the hash table within a user-supplied fixed memory area. pub fn with_fixed( num_buckets: u32, area: &'a mut [MaybeUninit], @@ -128,7 +135,7 @@ where } } - /// Initialize a new hash map in the given shared memory area + /// Place a new hash map in the given shared memory area pub fn with_shmem(num_buckets: u32, shmem: ShmemHandle) -> HashMapInit<'a, K, V> { let size = Self::estimate_size(num_buckets); shmem @@ -143,6 +150,7 @@ where } } + /// Make a resizable hash map within a new shared memory area with the given name. pub fn new_resizeable_named(num_buckets: u32, max_buckets: u32, name: &str) -> HashMapInit<'a, K, V> { let size = Self::estimate_size(num_buckets); let max_size = Self::estimate_size(max_buckets); @@ -158,6 +166,7 @@ where } } + /// Make a resizable hash map within a new anonymous shared memory area. pub fn new_resizeable(num_buckets: u32, max_buckets: u32) -> HashMapInit<'a, K, V> { use std::sync::atomic::{AtomicUsize, Ordering}; const COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -171,22 +180,26 @@ impl<'a, K, V, S: BuildHasher> HashMapAccess<'a, K, V, S> where K: Clone + Hash + Eq, { + /// Hash a key using the map's hasher. pub fn get_hash_value(&self, key: &K) -> u64 { self.hasher.hash_one(key) } + /// Get a reference to the corresponding value for a key given its hash. pub fn get_with_hash<'e>(&'e self, key: &K, hash: u64) -> Option<&'e V> { let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); map.inner.get_with_hash(key, hash) } + /// Get a reference to the entry containing a key given its hash. pub fn entry_with_hash(&mut self, key: K, hash: u64) -> Entry<'a, '_, K, V> { let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); map.inner.entry_with_hash(key, hash) } + /// Remove a key given its hash. Does nothing if key is not present. pub fn remove_with_hash(&mut self, key: &K, hash: u64) { let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); @@ -198,20 +211,23 @@ where }; } + /// Optionally return the entry for a bucket at a given index if it exists. pub fn entry_at_bucket(&mut self, pos: usize) -> Option> { let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); map.inner.entry_at_bucket(pos) } + /// Returns the number of buckets in the table. pub fn get_num_buckets(&self) -> usize { let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); map.inner.get_num_buckets() } /// Return the key and value stored in bucket with given index. This can be used to - /// iterate through the hash map. (An Iterator might be nicer. The communicator's - /// clock algorithm needs to _slowly_ iterate through all buckets with its clock hand, - /// without holding a lock. If we switch to an Iterator, it must not hold the lock.) + /// iterate through the hash map. + // TODO: An Iterator might be nicer. The communicator's clock algorithm needs to + // _slowly_ iterate through all buckets with its clock hand, without holding a lock. + // If we switch to an Iterator, it must not hold the lock. pub fn get_at_bucket(&self, pos: usize) -> Option<&(K, V)> { let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); @@ -222,30 +238,33 @@ where bucket.inner.as_ref() } + /// Returns the index of the bucket a given value corresponds to. pub fn get_bucket_for_value(&self, val_ptr: *const V) -> usize { let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); let origin = map.inner.buckets.as_ptr(); - let idx = (val_ptr as usize - origin as usize) / (size_of::() as usize); + let idx = (val_ptr as usize - origin as usize) / (size_of::>() as usize); assert!(idx < map.inner.buckets.len()); idx } - // for metrics + /// Returns the number of occupied buckets in the table. pub fn get_num_buckets_in_use(&self) -> usize { let map = unsafe { self.shared_ptr.as_ref() }.unwrap(); map.inner.buckets_in_use as usize } + /// Clears all entries in a table. Does not reset any shrinking operations. pub fn clear(&mut self) { let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); let inner = &mut map.inner; inner.clear() } - /// Helper function that abstracts the common logic between growing and shrinking. - /// The only significant difference in the rehashing step is how many buckets to rehash. + /// Perform an in-place rehash of some region (0..`rehash_buckets`) of the table and reset + /// the `buckets` and `dictionary` slices to be as long as `num_buckets`. Resets the freelist + /// in the process. fn rehash_dict( &mut self, inner: &mut CoreHashMap<'a, K, V>, @@ -256,7 +275,6 @@ where ) { inner.free_head = INVALID_POS; - // Recalculate the dictionary let buckets; let dictionary; unsafe { @@ -287,12 +305,11 @@ where dictionary[pos] = i as u32; } - // Finally, update the CoreHashMap struct inner.dictionary = dictionary; inner.buckets = buckets; } - /// Rehash the map. Intended for benchmarking only. + /// Rehash the map without growing or shrinking. pub fn shuffle(&mut self) { let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); let inner = &mut map.inner; @@ -303,11 +320,11 @@ where self.rehash_dict(inner, buckets_ptr, end_ptr, num_buckets, num_buckets); } - /// Grow + /// Grow the number of buckets within the table. /// - /// 1. grow the underlying shared memory area - /// 2. Initialize new buckets. This overwrites the current dictionary - /// 3. Recalculate the dictionary + /// 1. Grows the underlying shared memory area + /// 2. Initializes new buckets and overwrites the current dictionary + /// 3. Rehashes the dictionary pub fn grow(&mut self, num_buckets: u32) -> Result<(), crate::shmem::Error> { let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); let inner = &mut map.inner; @@ -400,15 +417,6 @@ where panic!("called finish_shrink before enough entries were removed"); } - let mut open_spots = 0; - let mut curr = inner.free_head; - while curr != INVALID_POS { - if curr < num_buckets { - open_spots += 1; - } - curr = inner.buckets[curr as usize].next; - } - for i in (num_buckets as usize)..inner.buckets.len() { if let Some((k, v)) = inner.buckets[i].inner.take() { // alloc bucket increases buckets in use, so need to decrease since we're just moving diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index b27180a80a..22c44f20ac 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -1,7 +1,4 @@ -//! Simple hash table with chaining -//! -//! # Resizing -//! +//! Simple hash table with chaining. use std::hash::Hash; use std::mem::MaybeUninit; @@ -10,12 +7,16 @@ use crate::hash::entry::{Entry, OccupiedEntry, PrevPos, VacantEntry}; pub(crate) const INVALID_POS: u32 = u32::MAX; -// Bucket +/// Fundamental storage unit within the hash table. Either empty or contains a key-value pair. +/// Always part of a chain of some kind (either a freelist if empty or a hash chain if full). pub(crate) struct Bucket { + /// Index of next bucket in the chain. pub(crate) next: u32, + /// Key-value pair contained within bucket. pub(crate) inner: Option<(K, V)>, } +/// Core hash table implementation. pub(crate) struct CoreHashMap<'a, K, V> { /// Dictionary used to map hashes to bucket indices. pub(crate) dictionary: &'a mut [u32], @@ -23,15 +24,15 @@ pub(crate) struct CoreHashMap<'a, K, V> { pub(crate) buckets: &'a mut [Bucket], /// Head of the freelist. pub(crate) free_head: u32, - - pub(crate) _user_list_head: u32, /// Maximum index of a bucket allowed to be allocated. INVALID_POS if no limit. pub(crate) alloc_limit: u32, - - // metrics + /// The number of currently occupied buckets. pub(crate) buckets_in_use: u32, + // Unclear what the purpose of this is. + pub(crate) _user_list_head: u32, } +/// Error for when there are no empty buckets left but one is needed. #[derive(Debug)] pub struct FullError(); @@ -41,6 +42,7 @@ where { const FILL_FACTOR: f32 = 0.60; + /// Estimate the size of data contained within the the hash map. pub fn estimate_size(num_buckets: u32) -> usize { let mut size = 0; @@ -92,6 +94,7 @@ where } } + /// Get the value associated with a key (if it exists) given its hash. pub fn get_with_hash(&self, key: &K, hash: u64) -> Option<&V> { let mut next = self.dictionary[hash as usize % self.dictionary.len()]; loop { @@ -108,7 +111,7 @@ where } } - // all updates are done through Entry + /// Get the `Entry` associated with a key given hash. This should be used for updates/inserts. pub fn entry_with_hash(&mut self, key: K, hash: u64) -> Entry<'a, '_, K, V> { let dict_pos = hash as usize % self.dictionary.len(); let first = self.dictionary[dict_pos]; @@ -149,15 +152,18 @@ where } } + /// Get number of buckets in map. pub fn get_num_buckets(&self) -> usize { self.buckets.len() } + /// Returns whether there is an ongoing shrink operation. pub fn is_shrinking(&self) -> bool { self.alloc_limit != INVALID_POS } /// Clears all entries from the hashmap. + /// /// Does not reset any allocation limits, but does clear any entries beyond them. pub fn clear(&mut self) { for i in 0..self.buckets.len() { @@ -177,7 +183,8 @@ where self.buckets_in_use = 0; } - + + /// Optionally gets the entry at an index if it is occupied. pub fn entry_at_bucket(&mut self, pos: usize) -> Option> { if pos >= self.buckets.len() { return None; diff --git a/libs/neon-shmem/src/hash/entry.rs b/libs/neon-shmem/src/hash/entry.rs index 63af840c5d..24c124189b 100644 --- a/libs/neon-shmem/src/hash/entry.rs +++ b/libs/neon-shmem/src/hash/entry.rs @@ -5,12 +5,13 @@ use crate::hash::core::{CoreHashMap, FullError, INVALID_POS}; use std::hash::Hash; use std::mem; +/// View into an entry in the map (either vacant or occupied). pub enum Entry<'a, 'b, K, V> { - Occupied(OccupiedEntry<'a, 'b, K, V>), + Occupied(OccupiedEntry<'a, 'b, K, V>), Vacant(VacantEntry<'a, 'b, K, V>), } -/// Helper enum representing the previous position within a hashmap chain. +/// Enum representing the previous position within a chain. #[derive(Clone, Copy)] pub(crate) enum PrevPos { /// Starting index within the dictionary. @@ -21,17 +22,9 @@ pub(crate) enum PrevPos { Unknown, } -impl PrevPos { - /// Unwrap an index from a `PrevPos::First`, panicking otherwise. - pub fn unwrap_first(&self) -> u32 { - match self { - Self::First(i) => *i, - _ => panic!("not first entry in chain") - } - } -} - +/// View into an occupied entry within the map. pub struct OccupiedEntry<'a, 'b, K, V> { + /// Mutable reference to the map containing this entry. pub(crate) map: &'b mut CoreHashMap<'a, K, V>, /// The key of the occupied entry pub(crate) _key: K, @@ -58,6 +51,7 @@ impl<'a, 'b, K, V> OccupiedEntry<'a, 'b, K, V> { .1 } + /// Inserts a value into the entry, replacing (and returning) the existing value. pub fn insert(&mut self, value: V) -> V { let bucket = &mut self.map.buckets[self.bucket_pos as usize]; // This assumes inner is Some, which it must be for an OccupiedEntry @@ -65,6 +59,7 @@ impl<'a, 'b, K, V> OccupiedEntry<'a, 'b, K, V> { old_value } + /// Removes the entry from the hash map, returning the value originally stored within it. pub fn remove(self) -> V { // CoreHashMap::remove returns Option<(K, V)>. We know it's Some for an OccupiedEntry. let bucket = &mut self.map.buckets[self.bucket_pos as usize]; @@ -89,13 +84,18 @@ impl<'a, 'b, K, V> OccupiedEntry<'a, 'b, K, V> { } } +/// An abstract view into a vacant entry within the map. pub struct VacantEntry<'a, 'b, K, V> { + /// Mutable reference to the map containing this entry. pub(crate) map: &'b mut CoreHashMap<'a, K, V>, - pub(crate) key: K, // The key to insert + /// The key to be inserted into this entry. + pub(crate) key: K, + /// The position within the dictionary corresponding to the key's hash. pub(crate) dict_pos: u32, } impl<'a, 'b, K: Clone + Hash + Eq, V> VacantEntry<'a, 'b, K, V> { + /// Insert a value into the vacant entry, finding and populating an empty bucket in the process. pub fn insert(self, value: V) -> Result<&'b mut V, FullError> { let pos = self.map.alloc_bucket(self.key, value)?; if pos == INVALID_POS { diff --git a/libs/neon-shmem/src/hash/tests.rs b/libs/neon-shmem/src/hash/tests.rs index a522423db1..8a6e8a0a29 100644 --- a/libs/neon-shmem/src/hash/tests.rs +++ b/libs/neon-shmem/src/hash/tests.rs @@ -303,15 +303,18 @@ fn test_bucket_ops() { assert_eq!(writer.get_num_buckets_in_use(), 1); assert_eq!(writer.get_num_buckets(), 1000); assert_eq!(writer.get_with_hash(&1.into(), hash), Some(&2)); - match writer.entry_with_hash(1.into(), hash) { + let pos = match writer.entry_with_hash(1.into(), hash) { Entry::Occupied(e) => { assert_eq!(e._key, 1.into()); let pos = e.bucket_pos as usize; assert_eq!(writer.entry_at_bucket(pos).unwrap()._key, 1.into()); assert_eq!(writer.get_at_bucket(pos), Some(&(1.into(), 2))); + pos }, Entry::Vacant(_) => { panic!("Insert didn't affect entry"); }, - } + }; + let ptr: *const usize = writer.get_with_hash(&1.into(), hash).unwrap(); + assert_eq!(writer.get_bucket_for_value(ptr), pos); writer.remove_with_hash(&1.into(), hash); assert_eq!(writer.get_with_hash(&1.into(), hash), None); }