Document hashmap implementation, fix get_bucket_for_value

Previously, `get_bucket_for_value` incorrectly divided by the size of
`V` to get the bucket index. Now it divides by the size of `Bucket<K,V>`.
This commit is contained in:
David Freifeld
2025-06-24 16:27:17 -07:00
parent 24e6c68772
commit ae740ca1bb
5 changed files with 86 additions and 72 deletions

View File

@@ -23,7 +23,3 @@ tempfile = "3.14.0"
[[bench]]
name = "hmap_resize"
harness = false
[[bin]]
name = "hmap_test"
path = "main.rs"

View File

@@ -1,18 +1,22 @@
//! Hash table implementation on top of 'shmem'
//! Resizable hash table implementation on top of byte-level storage (either `shmem` or fixed byte array).
//!
//! Features required in the long run by the communicator project:
//! This hash table has two major components: the bucket array and the dictionary. Each bucket within the
//! bucket array contains a Option<(K, V)> and an index of another bucket. In this way there is both an
//! implicit freelist within the bucket array (None buckets point to other None entries) and various hash
//! chains within the bucket array (a Some bucket will point to other Some buckets that had the same hash).
//!
//! [X] Accessible from both Postgres processes and rust threads in the communicator process
//! [X] Low latency
//! [ ] Scalable to lots of concurrent accesses (currently relies on caller for locking)
//! [ ] Resizable
//! Buckets are never moved unless they are within a region that is being shrunk, and so the actual hash-
//! dependent component is done with the dictionary. When a new key is inserted into the map, a position
//! within the dictionary is decided based on its hash, the data is inserted into an empty bucket based
//! off of the freelist, and then the index of said bucket is placed in the dictionary.
//!
//! This map is resizable (if initialized on top of a `ShmemHandle`). Both growing and shrinking happen
//! in-place and are at a high level achieved by expanding/reducing the bucket array and rebuilding the
//! dictionary by rehashing all keys.
use std::fmt::Debug;
use std::hash::{Hash, Hasher, BuildHasher};
use std::hash::{Hash, BuildHasher};
use std::mem::MaybeUninit;
use rustc_hash::FxBuildHasher;
use crate::shmem::ShmemHandle;
mod core;
@@ -21,12 +25,11 @@ pub mod entry;
#[cfg(test)]
mod tests;
use core::{CoreHashMap, INVALID_POS};
use core::{Bucket, CoreHashMap, INVALID_POS};
use entry::{Entry, OccupiedEntry};
/// Builder for a `HashMapAccess`.
pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> {
// Hash table can be allocated in a fixed memory area, or in a resizeable ShmemHandle.
shmem_handle: Option<ShmemHandle>,
shared_ptr: *mut HashMapShared<'a, K, V>,
shared_size: usize,
@@ -34,6 +37,7 @@ pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> {
num_buckets: u32,
}
/// Accessor for a hash table.
pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> {
shmem_handle: Option<ShmemHandle>,
shared_ptr: *mut HashMapShared<'a, K, V>,
@@ -43,16 +47,18 @@ pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> {
unsafe impl<'a, K: Sync, V: Sync, S> Sync for HashMapAccess<'a, K, V, S> {}
unsafe impl<'a, K: Send, V: Send, S> Send for HashMapAccess<'a, K, V, S> {}
impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> {
impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> {
pub fn with_hasher(self, hasher: S) -> HashMapInit<'a, K, V, S> {
Self { hasher, ..self }
}
/// Loosely (over)estimate the size needed to store a hash table with `num_buckets` buckets.
pub fn estimate_size(num_buckets: u32) -> usize {
// add some margin to cover alignment etc.
CoreHashMap::<K, V>::estimate_size(num_buckets) + size_of::<HashMapShared<K, V>>() + 1000
}
/// Initialize a table for writing.
pub fn attach_writer(self) -> HashMapAccess<'a, K, V, S> {
// carve out the HashMapShared struct from the area.
let mut ptr: *mut u8 = self.shared_ptr.cast();
@@ -90,13 +96,13 @@ impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> {
}
}
/// Initialize a table for reading. Currently identical to `attach_writer`.
pub fn attach_reader(self) -> HashMapAccess<'a, K, V, S> {
// no difference to attach_writer currently
self.attach_writer()
}
}
/// This is stored in the shared memory area
/// Hash table data that is actually stored in the shared memory area.
///
/// NOTE: We carve out the parts from a contiguous chunk. Growing and shrinking the hash table
/// relies on the memory layout! The data structures are laid out in the contiguous shared memory
@@ -115,6 +121,7 @@ impl<'a, K, V> HashMapInit<'a, K, V, rustc_hash::FxBuildHasher>
where
K: Clone + Hash + Eq
{
/// Place the hash table within a user-supplied fixed memory area.
pub fn with_fixed(
num_buckets: u32,
area: &'a mut [MaybeUninit<u8>],
@@ -128,7 +135,7 @@ where
}
}
/// Initialize a new hash map in the given shared memory area
/// Place a new hash map in the given shared memory area
pub fn with_shmem(num_buckets: u32, shmem: ShmemHandle) -> HashMapInit<'a, K, V> {
let size = Self::estimate_size(num_buckets);
shmem
@@ -143,6 +150,7 @@ where
}
}
/// Make a resizable hash map within a new shared memory area with the given name.
pub fn new_resizeable_named(num_buckets: u32, max_buckets: u32, name: &str) -> HashMapInit<'a, K, V> {
let size = Self::estimate_size(num_buckets);
let max_size = Self::estimate_size(max_buckets);
@@ -158,6 +166,7 @@ where
}
}
/// Make a resizable hash map within a new anonymous shared memory area.
pub fn new_resizeable(num_buckets: u32, max_buckets: u32) -> HashMapInit<'a, K, V> {
use std::sync::atomic::{AtomicUsize, Ordering};
const COUNTER: AtomicUsize = AtomicUsize::new(0);
@@ -171,22 +180,26 @@ impl<'a, K, V, S: BuildHasher> HashMapAccess<'a, K, V, S>
where
K: Clone + Hash + Eq,
{
/// Hash a key using the map's hasher.
pub fn get_hash_value(&self, key: &K) -> u64 {
self.hasher.hash_one(key)
}
/// Get a reference to the corresponding value for a key given its hash.
pub fn get_with_hash<'e>(&'e self, key: &K, hash: u64) -> Option<&'e V> {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
map.inner.get_with_hash(key, hash)
}
/// Get a reference to the entry containing a key given its hash.
pub fn entry_with_hash(&mut self, key: K, hash: u64) -> Entry<'a, '_, K, V> {
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
map.inner.entry_with_hash(key, hash)
}
/// Remove a key given its hash. Does nothing if key is not present.
pub fn remove_with_hash(&mut self, key: &K, hash: u64) {
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
@@ -198,20 +211,23 @@ where
};
}
/// Optionally return the entry for a bucket at a given index if it exists.
pub fn entry_at_bucket(&mut self, pos: usize) -> Option<OccupiedEntry<'a, '_, K, V>> {
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
map.inner.entry_at_bucket(pos)
}
/// Returns the number of buckets in the table.
pub fn get_num_buckets(&self) -> usize {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
map.inner.get_num_buckets()
}
/// Return the key and value stored in bucket with given index. This can be used to
/// iterate through the hash map. (An Iterator might be nicer. The communicator's
/// clock algorithm needs to _slowly_ iterate through all buckets with its clock hand,
/// without holding a lock. If we switch to an Iterator, it must not hold the lock.)
/// iterate through the hash map.
// TODO: An Iterator might be nicer. The communicator's clock algorithm needs to
// _slowly_ iterate through all buckets with its clock hand, without holding a lock.
// If we switch to an Iterator, it must not hold the lock.
pub fn get_at_bucket(&self, pos: usize) -> Option<&(K, V)> {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
@@ -222,30 +238,33 @@ where
bucket.inner.as_ref()
}
/// Returns the index of the bucket a given value corresponds to.
pub fn get_bucket_for_value(&self, val_ptr: *const V) -> usize {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
let origin = map.inner.buckets.as_ptr();
let idx = (val_ptr as usize - origin as usize) / (size_of::<V>() as usize);
let idx = (val_ptr as usize - origin as usize) / (size_of::<Bucket<K, V>>() as usize);
assert!(idx < map.inner.buckets.len());
idx
}
// for metrics
/// Returns the number of occupied buckets in the table.
pub fn get_num_buckets_in_use(&self) -> usize {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap();
map.inner.buckets_in_use as usize
}
/// Clears all entries in a table. Does not reset any shrinking operations.
pub fn clear(&mut self) {
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
let inner = &mut map.inner;
inner.clear()
}
/// Helper function that abstracts the common logic between growing and shrinking.
/// The only significant difference in the rehashing step is how many buckets to rehash.
/// Perform an in-place rehash of some region (0..`rehash_buckets`) of the table and reset
/// the `buckets` and `dictionary` slices to be as long as `num_buckets`. Resets the freelist
/// in the process.
fn rehash_dict(
&mut self,
inner: &mut CoreHashMap<'a, K, V>,
@@ -256,7 +275,6 @@ where
) {
inner.free_head = INVALID_POS;
// Recalculate the dictionary
let buckets;
let dictionary;
unsafe {
@@ -287,12 +305,11 @@ where
dictionary[pos] = i as u32;
}
// Finally, update the CoreHashMap struct
inner.dictionary = dictionary;
inner.buckets = buckets;
}
/// Rehash the map. Intended for benchmarking only.
/// Rehash the map without growing or shrinking.
pub fn shuffle(&mut self) {
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
let inner = &mut map.inner;
@@ -303,11 +320,11 @@ where
self.rehash_dict(inner, buckets_ptr, end_ptr, num_buckets, num_buckets);
}
/// Grow
/// Grow the number of buckets within the table.
///
/// 1. grow the underlying shared memory area
/// 2. Initialize new buckets. This overwrites the current dictionary
/// 3. Recalculate the dictionary
/// 1. Grows the underlying shared memory area
/// 2. Initializes new buckets and overwrites the current dictionary
/// 3. Rehashes the dictionary
pub fn grow(&mut self, num_buckets: u32) -> Result<(), crate::shmem::Error> {
let map = unsafe { self.shared_ptr.as_mut() }.unwrap();
let inner = &mut map.inner;
@@ -400,15 +417,6 @@ where
panic!("called finish_shrink before enough entries were removed");
}
let mut open_spots = 0;
let mut curr = inner.free_head;
while curr != INVALID_POS {
if curr < num_buckets {
open_spots += 1;
}
curr = inner.buckets[curr as usize].next;
}
for i in (num_buckets as usize)..inner.buckets.len() {
if let Some((k, v)) = inner.buckets[i].inner.take() {
// alloc bucket increases buckets in use, so need to decrease since we're just moving

View File

@@ -1,7 +1,4 @@
//! Simple hash table with chaining
//!
//! # Resizing
//!
//! Simple hash table with chaining.
use std::hash::Hash;
use std::mem::MaybeUninit;
@@ -10,12 +7,16 @@ use crate::hash::entry::{Entry, OccupiedEntry, PrevPos, VacantEntry};
pub(crate) const INVALID_POS: u32 = u32::MAX;
// Bucket
/// Fundamental storage unit within the hash table. Either empty or contains a key-value pair.
/// Always part of a chain of some kind (either a freelist if empty or a hash chain if full).
pub(crate) struct Bucket<K, V> {
/// Index of next bucket in the chain.
pub(crate) next: u32,
/// Key-value pair contained within bucket.
pub(crate) inner: Option<(K, V)>,
}
/// Core hash table implementation.
pub(crate) struct CoreHashMap<'a, K, V> {
/// Dictionary used to map hashes to bucket indices.
pub(crate) dictionary: &'a mut [u32],
@@ -23,15 +24,15 @@ pub(crate) struct CoreHashMap<'a, K, V> {
pub(crate) buckets: &'a mut [Bucket<K, V>],
/// Head of the freelist.
pub(crate) free_head: u32,
pub(crate) _user_list_head: u32,
/// Maximum index of a bucket allowed to be allocated. INVALID_POS if no limit.
pub(crate) alloc_limit: u32,
// metrics
/// The number of currently occupied buckets.
pub(crate) buckets_in_use: u32,
// Unclear what the purpose of this is.
pub(crate) _user_list_head: u32,
}
/// Error for when there are no empty buckets left but one is needed.
#[derive(Debug)]
pub struct FullError();
@@ -41,6 +42,7 @@ where
{
const FILL_FACTOR: f32 = 0.60;
/// Estimate the size of data contained within the the hash map.
pub fn estimate_size(num_buckets: u32) -> usize {
let mut size = 0;
@@ -92,6 +94,7 @@ where
}
}
/// Get the value associated with a key (if it exists) given its hash.
pub fn get_with_hash(&self, key: &K, hash: u64) -> Option<&V> {
let mut next = self.dictionary[hash as usize % self.dictionary.len()];
loop {
@@ -108,7 +111,7 @@ where
}
}
// all updates are done through Entry
/// Get the `Entry` associated with a key given hash. This should be used for updates/inserts.
pub fn entry_with_hash(&mut self, key: K, hash: u64) -> Entry<'a, '_, K, V> {
let dict_pos = hash as usize % self.dictionary.len();
let first = self.dictionary[dict_pos];
@@ -149,15 +152,18 @@ where
}
}
/// Get number of buckets in map.
pub fn get_num_buckets(&self) -> usize {
self.buckets.len()
}
/// Returns whether there is an ongoing shrink operation.
pub fn is_shrinking(&self) -> bool {
self.alloc_limit != INVALID_POS
}
/// Clears all entries from the hashmap.
///
/// Does not reset any allocation limits, but does clear any entries beyond them.
pub fn clear(&mut self) {
for i in 0..self.buckets.len() {
@@ -177,7 +183,8 @@ where
self.buckets_in_use = 0;
}
/// Optionally gets the entry at an index if it is occupied.
pub fn entry_at_bucket(&mut self, pos: usize) -> Option<OccupiedEntry<'a, '_, K, V>> {
if pos >= self.buckets.len() {
return None;

View File

@@ -5,12 +5,13 @@ use crate::hash::core::{CoreHashMap, FullError, INVALID_POS};
use std::hash::Hash;
use std::mem;
/// View into an entry in the map (either vacant or occupied).
pub enum Entry<'a, 'b, K, V> {
Occupied(OccupiedEntry<'a, 'b, K, V>),
Occupied(OccupiedEntry<'a, 'b, K, V>),
Vacant(VacantEntry<'a, 'b, K, V>),
}
/// Helper enum representing the previous position within a hashmap chain.
/// Enum representing the previous position within a chain.
#[derive(Clone, Copy)]
pub(crate) enum PrevPos {
/// Starting index within the dictionary.
@@ -21,17 +22,9 @@ pub(crate) enum PrevPos {
Unknown,
}
impl PrevPos {
/// Unwrap an index from a `PrevPos::First`, panicking otherwise.
pub fn unwrap_first(&self) -> u32 {
match self {
Self::First(i) => *i,
_ => panic!("not first entry in chain")
}
}
}
/// View into an occupied entry within the map.
pub struct OccupiedEntry<'a, 'b, K, V> {
/// Mutable reference to the map containing this entry.
pub(crate) map: &'b mut CoreHashMap<'a, K, V>,
/// The key of the occupied entry
pub(crate) _key: K,
@@ -58,6 +51,7 @@ impl<'a, 'b, K, V> OccupiedEntry<'a, 'b, K, V> {
.1
}
/// Inserts a value into the entry, replacing (and returning) the existing value.
pub fn insert(&mut self, value: V) -> V {
let bucket = &mut self.map.buckets[self.bucket_pos as usize];
// This assumes inner is Some, which it must be for an OccupiedEntry
@@ -65,6 +59,7 @@ impl<'a, 'b, K, V> OccupiedEntry<'a, 'b, K, V> {
old_value
}
/// Removes the entry from the hash map, returning the value originally stored within it.
pub fn remove(self) -> V {
// CoreHashMap::remove returns Option<(K, V)>. We know it's Some for an OccupiedEntry.
let bucket = &mut self.map.buckets[self.bucket_pos as usize];
@@ -89,13 +84,18 @@ impl<'a, 'b, K, V> OccupiedEntry<'a, 'b, K, V> {
}
}
/// An abstract view into a vacant entry within the map.
pub struct VacantEntry<'a, 'b, K, V> {
/// Mutable reference to the map containing this entry.
pub(crate) map: &'b mut CoreHashMap<'a, K, V>,
pub(crate) key: K, // The key to insert
/// The key to be inserted into this entry.
pub(crate) key: K,
/// The position within the dictionary corresponding to the key's hash.
pub(crate) dict_pos: u32,
}
impl<'a, 'b, K: Clone + Hash + Eq, V> VacantEntry<'a, 'b, K, V> {
/// Insert a value into the vacant entry, finding and populating an empty bucket in the process.
pub fn insert(self, value: V) -> Result<&'b mut V, FullError> {
let pos = self.map.alloc_bucket(self.key, value)?;
if pos == INVALID_POS {

View File

@@ -303,15 +303,18 @@ fn test_bucket_ops() {
assert_eq!(writer.get_num_buckets_in_use(), 1);
assert_eq!(writer.get_num_buckets(), 1000);
assert_eq!(writer.get_with_hash(&1.into(), hash), Some(&2));
match writer.entry_with_hash(1.into(), hash) {
let pos = match writer.entry_with_hash(1.into(), hash) {
Entry::Occupied(e) => {
assert_eq!(e._key, 1.into());
let pos = e.bucket_pos as usize;
assert_eq!(writer.entry_at_bucket(pos).unwrap()._key, 1.into());
assert_eq!(writer.get_at_bucket(pos), Some(&(1.into(), 2)));
pos
},
Entry::Vacant(_) => { panic!("Insert didn't affect entry"); },
}
};
let ptr: *const usize = writer.get_with_hash(&1.into(), hash).unwrap();
assert_eq!(writer.get_bucket_for_value(ptr), pos);
writer.remove_with_hash(&1.into(), hash);
assert_eq!(writer.get_with_hash(&1.into(), hash), None);
}