diff --git a/libs/neon-shmem/src/hash.rs b/libs/neon-shmem/src/hash.rs index 269c5ea9c3..d0f102e837 100644 --- a/libs/neon-shmem/src/hash.rs +++ b/libs/neon-shmem/src/hash.rs @@ -344,6 +344,7 @@ where shard_pos: shard_off, bucket_pos: pos, bucket_arr: &map.bucket_arr, + key_pos: entry_pos, } }) } @@ -376,7 +377,13 @@ where let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); map.clear(); } - + + /// Begin a rehash operation. Converts all existing entries + // TODO: missing logic to prevent furhter resize operations when one is already underway. + // One future feature could be to allow interruptible resizes. We wouldn't pay much of a + // space penalty if we used something like https://crates.io/crates/u4 inside EntryTag + // to allow for many tiers of older chains (we would have to track previous sizes within + // a sliding window at the front of the memory region or something) fn begin_rehash( &self, shards: &mut Vec>>, @@ -402,18 +409,23 @@ where true } - - // TODO(quantumish): off by one for return value logic? + // Unfinished, final large-ish piece standing in the way of a prototype. + // + // Based off the hashbrown implementation but adapted to an incremental context. See below: + // https://github.com/quantumish/hashbrown/blob/6610e6d2b1f288ef7b0709a3efefbc846395dc5e/src/raw/mod.rs#L2866 fn do_rehash(&self) -> bool { let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); // TODO(quantumish): refactor these out into settable quantities const REHASH_CHUNK_SIZE: usize = 10; - const REHASH_ATTEMPTS: usize = 5; let end = map.rehash_end.load(Ordering::Relaxed); let ind = map.rehash_index.load(Ordering::Relaxed); if ind >= end { return true } + // We have to use a mutex to prevent concurrent rehashes as they provide a pretty + // obvious chance at a deadlock: one thread wants to rehash an entry into a shard + // which is held by another thread which wants to rehash its block into the shard + // held by the first. Doesn't seem like there's an obvious way around this? let _guard = self.resize_lock.try_lock(); if _guard.is_none() { return false } @@ -438,27 +450,31 @@ where EntryTag::Tombstone => core::MapEntryType::Skip, _ => core::MapEntryType::Tombstone, }).unwrap(); - let new_pos = new.pos(); - match new.tag() { - EntryTag::Empty | EntryTag::RehashTombstone => { - shard.keys[shard_off].tag = EntryTag::Empty; - unsafe { - std::mem::swap( - shard.keys[shard_off].val.assume_init_mut(), - new. - }, - EntryTag::Rehash => { + // I believe the blocker here is that this unfortunately this would require + // duplicating a lot of the logic of a write lookup again but with the caveat + // that we're already holding one of the shard locks and need to pass that + // context on. One thing I was considering at the time was using a hashmap to + // manage the lock guards and passing that around? + todo!("finish rehash implementation") + // match new.tag() { + // EntryTag::Empty | EntryTag::RehashTombstone => { + // shard.keys[shard_off].tag = EntryTag::Empty; + // unsafe { + // std::mem::swap( + // shard.keys[shard_off].val.assume_init_mut(), + // new. + // }, + // EntryTag::Rehash => { - }, - _ => unreachable!() - } + // }, + // _ => unreachable!() + // } } } false } - pub fn finish_rehash(&self) { let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); while self.do_rehash() {} @@ -572,7 +588,7 @@ where pub fn shrink_goal(&self) -> Option { let map = unsafe { self.shared_ptr.as_mut() }.unwrap(); let goal = map.bucket_arr.alloc_limit.load(Ordering::Relaxed); - goal.next_checkeddd() + goal.next_checked() } pub fn finish_shrink(&self) -> Result<(), shmem::Error> { @@ -582,7 +598,7 @@ where let num_buckets = map.bucket_arr.alloc_limit .load(Ordering::Relaxed) - .next_checkeddd() + .next_checked() .expect("called finish_shrink when no shrink is in progress"); if map.get_num_buckets() == num_buckets { diff --git a/libs/neon-shmem/src/hash/bucket.rs b/libs/neon-shmem/src/hash/bucket.rs index 2cae84472d..29c1c3c14c 100644 --- a/libs/neon-shmem/src/hash/bucket.rs +++ b/libs/neon-shmem/src/hash/bucket.rs @@ -1,3 +1,20 @@ +//! Lock-free stable array of buckets managed with a freelist. +//! +//! Since the positions of entries in the dictionary and the bucket array are not correlated, +//! we either had to separately shard both and deal with the overhead of two lock acquisitions +//! per read/write, or make the bucket array lock free. This is *generally* fine since most +//! accesses of the bucket array are done while holding the lock on the corresponding dict shard +//! and thus synchronized. May not hold up to the removals done by the LFC which is a problem. +//! +//! Routines are pretty closely adapted from https://timharris.uk/papers/2001-disc.pdf +//! +//! Notable caveats: +//! - Can only store around 2^30 entries, which is actually only 10x our current workload. +//! - This is because we need two tag bits to distinguish full/empty and marked/unmarked entries. +//! - Has not been seriously tested. +//! +//! Full entries also store the index to their corresponding dictionary entry in order +//! to enable .entry_at_bucket() which is needed for the clock eviction algo in the LFC. use std::cell::UnsafeCell; use std::mem::MaybeUninit; @@ -9,6 +26,7 @@ use atomic::Atomic; #[repr(transparent)] pub(crate) struct BucketIdx(pub(super) u32); +// This should always be true as `BucketIdx` is a simple newtype. const _: () = assert!(Atomic::::is_lock_free()); impl BucketIdx { @@ -20,8 +38,10 @@ impl BucketIdx { const FULL_TAG: u32 = 0b10 << 30; /// Reserved. Don't use me. const RSVD_TAG: u32 = 0b11 << 30; - + + /// Invalid index within the bucket array (can be mixed with any tag). pub const INVALID: Self = Self(0x3FFFFFFF); + /// Max index within the bucket array (can be mixed with any tag). pub const MAX: usize = Self::INVALID.0 as usize - 1; pub(super) fn is_marked(&self) -> bool { @@ -45,15 +65,17 @@ impl BucketIdx { debug_assert!(val < Self::MAX); Self(val as u32 | Self::FULL_TAG) } - + + /// Try to extract a valid index if the tag is NEXT. pub fn next_checked(&self) -> Option { - if *self == Self::INVALID || self.is_marked() { - None - } else { + if self.0 & Self::RSVD_TAG == Self::NEXT_TAG && *self != Self::INVALID { Some(self.0 as usize) + } else { + None } } + /// Try to extract an index if the tag is FULL. pub fn full_checked(&self) -> Option { if self.0 & Self::RSVD_TAG == Self::FULL_TAG { Some((self.0 & Self::INVALID.0) as usize) @@ -63,24 +85,12 @@ impl BucketIdx { } } -impl std::fmt::Debug for BucketIdx { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - let idx = self.get_unmarked().0; - write!( - f, "BucketIdx(marked={}, idx={})", - self.is_marked(), - match *self { - Self::INVALID => "INVALID".to_string(), - _ => format!("{idx}") - } - ) - } -} - -/// format storage unit within the hash table. Either empty or contains a key-value pair. -/// Always part of a chain of some kind (either a freelist if empty or a hash chain if full). +/// Entry within the bucket array. Value is only initialized if you pub(crate) struct Bucket { - pub val: MaybeUninit, + // Only initialized if `next` field is tagged with FULL. + pub val: MaybeUninit, + // Either points to next entry in freelist if empty or points + // to the corresponding entry in dictionary if full. pub next: Atomic, } @@ -92,13 +102,6 @@ impl Bucket { } } - pub fn full(val: V) -> Self { - Self { - val: MaybeUninit::new(val), - next: Atomic::new(BucketIdx::INVALID) - } - } - pub fn as_ref(&self) -> &V { unsafe { self.val.assume_init_ref() } } @@ -164,7 +167,9 @@ impl<'a, V> BucketArray<'a, V> { pub fn len(&self) -> usize { unsafe { (&*self.buckets.get()).len() } } - + + /// Deallocate a bucket, adding it to the free list. + // Adapted from List::insert in https://timharris.uk/papers/2001-disc.pdf pub fn dealloc_bucket(&self, pos: usize) -> V { loop { let free = self.free_head.load(Ordering::Relaxed); @@ -178,6 +183,8 @@ impl<'a, V> BucketArray<'a, V> { } } + /// Find a usable bucket at the front of the free list. + // Adapted from List::search in https://timharris.uk/papers/2001-disc.pdf #[allow(unused_assignments)] fn find_bucket(&self) -> (BucketIdx, BucketIdx) { let mut left_node = BucketIdx::INVALID; @@ -229,9 +236,10 @@ impl<'a, V> BucketArray<'a, V> { } } + /// Pop a bucket from the free list. + // Adapted from List::delete in https://timharris.uk/papers/2001-disc.pdf #[allow(unused_assignments)] pub(crate) fn alloc_bucket(&self, value: V, key_pos: usize) -> Option { - // println!("alloc()"); let mut right_node_next = BucketIdx::INVALID; let mut left_idx = BucketIdx::INVALID; let mut right_idx = BucketIdx::INVALID; diff --git a/libs/neon-shmem/src/hash/core.rs b/libs/neon-shmem/src/hash/core.rs index f8b426ff2d..33921aae83 100644 --- a/libs/neon-shmem/src/hash/core.rs +++ b/libs/neon-shmem/src/hash/core.rs @@ -1,4 +1,8 @@ -//! Simple hash table with chaining. +//! Sharded linear probing hash table. + +//! NOTE/FIXME: one major bug with this design is that the current hashmap DOES NOT TRACK +//! the previous size of the hashmap and thus does lookups incorrectly/badly. This should +//! be a reasonably minor fix? use std::cell::UnsafeCell; use std::hash::Hash; @@ -11,27 +15,45 @@ use crate::hash::{ bucket::{BucketArray, Bucket, BucketIdx} }; +/// Metadata tag for the type of an entry in the hashmap. #[derive(PartialEq, Eq, Clone, Copy)] pub(crate) enum EntryTag { + /// An occupied entry inserted after a resize operation. Occupied, + /// An occupied entry inserted before a resize operation + /// a.k.a. an entry that needs to be rehashed at some point. Rehash, + /// An entry that was once `Occupied`. Tombstone, + /// An entry that was once `Rehash`. RehashTombstone, + /// An empty entry. Empty, } +/// Searching the chains of a hashmap oftentimes requires interpreting +/// a set of metadata tags differently. This enum encodes the ways a +/// metadata tag can be treated during a lookup. pub(crate) enum MapEntryType { + /// Should be treated as if it were occupied. Occupied, + /// Should be treated as if it were a tombstone. Tombstone, + /// Should be treated as if it were empty. Empty, + /// Should be ignored. Skip } +/// A key within the dictionary component of the hashmap. pub(crate) struct EntryKey { + // NOTE: This could be split out to save 3 bytes per entry! + // Wasn't sure it was worth the penalty of another shmem area. pub(crate) tag: EntryTag, pub(crate) val: MaybeUninit, } +/// A shard of the dictionary. pub(crate) struct DictShard<'a, K> { pub(crate) keys: &'a mut [EntryKey], pub(crate) idxs: &'a mut [BucketIdx], @@ -43,7 +65,7 @@ impl<'a, K> DictShard<'a, K> { } } -pub(crate) struct MaybeUninitDictShard<'a, K> { + pub(crate) struct MaybeUninitDictShard<'a, K> { pub(crate) keys: &'a mut [MaybeUninit>], pub(crate) idxs: &'a mut [MaybeUninit], } @@ -52,8 +74,11 @@ pub(crate) struct MaybeUninitDictShard<'a, K> { pub(crate) struct CoreHashMap<'a, K, V> { /// Dictionary used to map hashes to bucket indices. pub(crate) dict_shards: &'a mut [RwLock>], + /// Stable bucket array used to store the values. pub(crate) bucket_arr: BucketArray<'a, V>, - pub(crate) rehash_index: AtomicUsize, + /// Index of the next entry to process for rehashing. + pub(crate) rehash_index: AtomicUsize, + /// Index of the end of the range to be rehashed. pub(crate) rehash_end: AtomicUsize, } @@ -108,11 +133,14 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { bucket_arr: BucketArray::new(buckets_cell), } } - + + /// Get the value associated with a key (if it exists) given its hash. pub fn get_with_hash(&'a self, key: &K, hash: u64) -> Option> { let ind = self.rehash_index.load(Ordering::Relaxed); let end = self.rehash_end.load(Ordering::Relaxed); + // First search the chains from the current context (thus treat + // to-be-rehashed entries as tombstones within a current chain). let res = self.get(key, hash, |tag| match tag { EntryTag::Empty => MapEntryType::Empty, EntryTag::Occupied => MapEntryType::Occupied, @@ -121,8 +149,10 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { if res.is_some() { return res; } - + if ind < end { + // Search chains from the previous size of the map if a rehash is in progress. + // Ignore any entries inserted since the resize operation occurred. self.get(key, hash, |tag| match tag { EntryTag::Empty => MapEntryType::Empty, EntryTag::Rehash => MapEntryType::Occupied, @@ -140,6 +170,8 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { let res = self.entry(key.clone(), hash, |tag| match tag { EntryTag::Empty => MapEntryType::Empty, EntryTag::Occupied => MapEntryType::Occupied, + // We can't treat old entries as tombstones here, as we definitely can't + // insert over them! Instead we can just skip directly over them. EntryTag::Rehash => MapEntryType::Skip, _ => MapEntryType::Tombstone, }); @@ -159,7 +191,6 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { } } - /// Get the value associated with a key (if it exists) given its hash. fn get(&'a self, key: &K, hash: u64, f: F) -> Option> where F: Fn(EntryTag) -> MapEntryType { @@ -191,26 +222,39 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { None } + pub fn entry(&'a self, key: K, hash: u64, f: F) -> Result, FullError> where F: Fn(EntryTag) -> MapEntryType { // We need to keep holding on the locks for each shard we process since if we don't find the // key anywhere, we want to insert it at the earliest possible position (which may be several // shards away). Ideally cross-shard chains are quite rare, so this shouldn't be a big deal. + // + // NB: Somewhat real chance of a deadlock! E.g. one thread has a ridiculously long chain that + // starts at block N and wraps around the hashmap to N-1, yet another thread begins a lookup at + // N-1 during this and has a chain that lasts a few shards. Then thread 1 is blocked on thread 2 + // to get to shard N-1 but thread 2 is blocked on thread 1 to get to shard N. Pretty fringe case + // since chains shouldn't last very long, but still a problem with this somewhat naive sharding + // mechanism. + // + // We could fix this by either refusing to hold locks and only inserting into the earliest entry + // within the current shard (which effectively means after a while we forget about certain open + // entries at the end of shards) or by pivoting to a more involved concurrency setup? let mut shards = Vec::new(); let mut insert_pos = None; let mut insert_shard = None; let num_buckets = self.get_num_buckets(); let shard_size = num_buckets / self.dict_shards.len(); - let bucket_pos = hash as usize % num_buckets; - let shard_start = bucket_pos / shard_size; + let mut entry_pos = hash as usize % num_buckets; + let shard_start = entry_pos / shard_size; for off in 0..self.dict_shards.len() { let shard_idx = (shard_start + off) % self.dict_shards.len(); let shard = self.dict_shards[shard_idx].write(); let mut inserted = false; - let entry_start = if off == 0 { bucket_pos % shard_size } else { 0 }; + let entry_start = if off == 0 { entry_pos % shard_size } else { 0 }; for entry_idx in entry_start..shard.len() { + entry_pos += 1; match f(shard.keys[entry_idx].tag) { MapEntryType::Skip => continue, MapEntryType::Empty => { @@ -243,6 +287,7 @@ impl<'a, K: Clone + Hash + Eq, V> CoreHashMap<'a, K, V> { shard_pos: entry_idx, bucket_pos, bucket_arr: &self.bucket_arr, + key_pos: entry_pos, })); } } diff --git a/libs/neon-shmem/src/hash/entry.rs b/libs/neon-shmem/src/hash/entry.rs index ee3d5cac70..9196b6ec7b 100644 --- a/libs/neon-shmem/src/hash/entry.rs +++ b/libs/neon-shmem/src/hash/entry.rs @@ -8,23 +8,11 @@ use crate::sync::{RwLockWriteGuard, ValueWriteGuard}; use std::hash::Hash; -use super::core::EntryKey; - pub enum Entry<'a, K, V> { Occupied(OccupiedEntry<'a, K, V>), Vacant(VacantEntry<'a, K, V>), } -impl<'a, K, V> Entry<'a, K, V> { - pub fn loc(&self) -> (RwLockWriteGuard<'a, DictShard<'a, K>>, usize) { - match self { - Self::Occupied(o) => o.shard.keys[o.shard_pos].tag, - Self::Vacant(o) => o.shard.keys[o.shard_pos].tag - } - } - -} - pub struct OccupiedEntry<'a, K, V> { /// Mutable reference to the shard of the map the entry is in. pub(crate) shard: RwLockWriteGuard<'a, DictShard<'a, K>>, @@ -81,7 +69,7 @@ impl<'a, K: Clone + Hash + Eq, V> VacantEntry<'a, K, V> { .expect("bucket is available if entry is"); self.shard.keys[self.shard_pos].tag = EntryTag::Occupied; self.shard.keys[self.shard_pos].val.write(self._key); - let idx = pos.next_checkeddd().expect("position is valid"); + let idx = pos.next_checked().expect("position is valid"); self.shard.idxs[self.shard_pos] = pos; RwLockWriteGuard::map(self.shard, |_| {