From 083118e98e0b7e2e78d4532b864fc3a6b1e1a9c0 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 2 May 2025 08:52:05 +0300 Subject: [PATCH] Implement epoch system --- Cargo.lock | 5 +- Cargo.toml | 1 + libs/neonart/Cargo.toml | 1 + libs/neonart/src/algorithm/node_ref.rs | 6 +- libs/neonart/src/allocator.rs | 63 ----------- libs/neonart/src/epoch.rs | 146 ++++++++++++++++++++++--- libs/neonart/src/lib.rs | 31 ++++-- 7 files changed, 161 insertions(+), 92 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9c74c0ca6..cebb2684bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1689,9 +1689,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.19" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crossterm" @@ -3905,6 +3905,7 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" name = "neonart" version = "0.1.0" dependencies = [ + "crossbeam-utils", "rand 0.8.5", "spin", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 186388b25c..0cf8d0ba38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ clap = { version = "4.0", features = ["derive", "env"] } clashmap = { version = "1.0", features = ["raw-api"] } comfy-table = "7.1" const_format = "0.2" +crossbeam-utils = "0.8.21" crc32c = "0.6" diatomic-waker = { version = "0.2.3" } either = "1.8" diff --git a/libs/neonart/Cargo.toml b/libs/neonart/Cargo.toml index f2a8c9eaed..79a86d3f7e 100644 --- a/libs/neonart/Cargo.toml +++ b/libs/neonart/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +crossbeam-utils.workspace = true spin.workspace = true tracing.workspace = true diff --git a/libs/neonart/src/algorithm/node_ref.rs b/libs/neonart/src/algorithm/node_ref.rs index 4507be9db7..282f979f8f 100644 --- a/libs/neonart/src/algorithm/node_ref.rs +++ b/libs/neonart/src/algorithm/node_ref.rs @@ -13,7 +13,7 @@ use crate::allocator::ArtAllocator; pub struct NodeRef<'e, V> { ptr: NodePtr, - phantom: PhantomData<&'e EpochPin>, + phantom: PhantomData<&'e EpochPin<'e>>, } impl<'e, V> Debug for NodeRef<'e, V> { @@ -50,7 +50,7 @@ pub struct ReadLockedNodeRef<'e, V> { ptr: NodePtr, version: u64, - phantom: PhantomData<&'e EpochPin>, + phantom: PhantomData<&'e EpochPin<'e>>, } pub(crate) enum ChildOrValue<'e, V> { @@ -115,7 +115,7 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> { /// the version after each read. pub struct WriteLockedNodeRef<'e, V> { ptr: NodePtr, - phantom: PhantomData<&'e EpochPin>, + phantom: PhantomData<&'e EpochPin<'e>>, } impl<'e, V: Value> WriteLockedNodeRef<'e, V> { diff --git a/libs/neonart/src/allocator.rs b/libs/neonart/src/allocator.rs index 12a2ac0e68..1b3ba51cfb 100644 --- a/libs/neonart/src/allocator.rs +++ b/libs/neonart/src/allocator.rs @@ -123,66 +123,3 @@ impl<'t, V: crate::Value> ArtAllocator for ArtMultiSlabAllocator<'t, V> { self.inner.dealloc_slab(7, ptr.cast()) } } - -/* -pub struct Allocator { - area: *mut MaybeUninit, - allocated: AtomicUsize, - size: usize, -} - -const MAXALIGN: usize = std::mem::align_of::(); - -impl Allocator { - pub fn new_uninit(area: &'static mut [MaybeUninit]) -> Allocator { - let ptr = area.as_mut_ptr(); - let size = area.len(); - Self::new_from_ptr(ptr, size) - } - - pub fn new(area: &'static mut [u8]) -> Allocator { - let ptr: *mut MaybeUninit = area.as_mut_ptr().cast(); - let size = area.len(); - Self::new_from_ptr(ptr, size) - } - - pub fn new_from_ptr(ptr: *mut MaybeUninit, size: usize) -> Allocator { - let padding = ptr.align_offset(MAXALIGN); - - Allocator { - area: ptr, - allocated: AtomicUsize::new(padding), - size, - } - } - - pub fn alloc<'a, T: Sized>(&'a self, value: T) -> AllocatedBox<'a, T> { - let sz = std::mem::size_of::(); - - // pad all allocations to MAXALIGN boundaries - assert!(std::mem::align_of::() <= MAXALIGN); - let sz = sz.next_multiple_of(MAXALIGN); - - let offset = self.allocated.fetch_add(sz, Ordering::Relaxed); - - if offset + sz > self.size { - panic!("out of memory"); - } - - let inner = unsafe { - let inner = self.area.offset(offset as isize).cast::(); - *inner = value; - NonNull::new_unchecked(inner) - }; - - AllocatedBox { - inner, - _phantom: PhantomData, - } - } - - pub fn _dealloc_node(&self, _node: AllocatedBox) { - // doesn't free it immediately. - } -} -*/ diff --git a/libs/neonart/src/epoch.rs b/libs/neonart/src/epoch.rs index 00019a3b9a..1fce1fbb0b 100644 --- a/libs/neonart/src/epoch.rs +++ b/libs/neonart/src/epoch.rs @@ -1,23 +1,143 @@ //! This is similar to crossbeam_epoch crate, but works in shared memory -//! -//! FIXME: not implemented yet. (We haven't implemented removing any nodes from the ART -//! tree, which is why we get away without this now) -pub(crate) struct EpochPin {} +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -pub(crate) fn pin_epoch() -> EpochPin { - EpochPin {} +use crossbeam_utils::CachePadded; +use spin; + +const NUM_SLOTS: usize = 1000; + +/// This is the struct that is stored in shmem +/// +/// bit 0: is it pinned or not? +/// rest of the bits are the epoch counter. +pub struct EpochShared { + global_epoch: AtomicU64, + participants: [CachePadded; NUM_SLOTS], + + broadcast_lock: spin::Mutex<()>, } -/* -struct CollectorGlobal { - epoch: AtomicU64, +impl EpochShared { + pub fn new() -> EpochShared { + EpochShared { + global_epoch: AtomicU64::new(2), + participants: [const { CachePadded::new(AtomicU64::new(2)) }; NUM_SLOTS], + broadcast_lock: spin::Mutex::new(()), + } + } - participants: CachePadded, // make it an array + pub fn register(&self) -> LocalHandle { + LocalHandle { + global: self, + last_slot: AtomicUsize::new(0), // todo: choose more intelligently + } + } + + fn release_pin(&self, slot: usize, _epoch: u64) { + let global_epoch = self.global_epoch.load(Ordering::Relaxed); + self.participants[slot].store(global_epoch, Ordering::Relaxed); + } + + fn pin_internal(&self, slot_hint: usize) -> (usize, u64) { + // pick a slot + let mut slot = slot_hint; + let epoch = loop { + let old = self.participants[slot].fetch_or(1, Ordering::Relaxed); + if old & 1 == 0 { + // Got this slot + break old; + } + + // the slot was busy by another thread / process. try a different slot + slot += 1; + if slot == NUM_SLOTS { + slot = 0; + } + continue; + }; + (slot, epoch) + } + + fn advance(&self) -> u64 { + // Advance the global epoch + let old_epoch = self.global_epoch.fetch_add(2, Ordering::Relaxed); + let new_epoch = old_epoch + 2; + + // Anyone that release their pin after this will update their slot. + new_epoch + } + + fn broadcast(&self) { + let Some(_guard) = self.broadcast_lock.try_lock() else { + return; + }; + + let epoch = self.global_epoch.load(Ordering::Relaxed); + let old_epoch = epoch.wrapping_sub(2); + + // Update all free slots. + for i in 0..NUM_SLOTS { + // TODO: check result, as a sanity check. It should either be the old epoch, or pinned + let _ = self.participants[i].compare_exchange( + old_epoch, + epoch, + Ordering::Relaxed, + Ordering::Relaxed, + ); + } + + // FIXME: memory fence here, since we used Relaxed? + } + + fn get_oldest(&self) -> u64 { + // Read all slots. + let now = self.global_epoch.load(Ordering::Relaxed); + let mut oldest = now; + for i in 0..NUM_SLOTS { + let this_epoch = self.participants[i].load(Ordering::Relaxed); + let delta = now.wrapping_sub(this_epoch); + if delta > u64::MAX / 2 { + // this is very recent + } else { + if delta > now.wrapping_sub(oldest) { + oldest = this_epoch; + } + } + } + oldest + } } +pub(crate) struct EpochPin<'e> { + slot: usize, + epoch: u64, -struct CollectorQueue { - + handle: &'e LocalHandle<'e>, +} + +impl<'e> Drop for EpochPin<'e> { + fn drop(&mut self) { + self.handle.global.release_pin(self.slot, self.epoch); + } +} + +pub struct LocalHandle<'g> { + global: &'g EpochShared, + + last_slot: AtomicUsize, +} + +impl<'g> LocalHandle<'g> { + pub fn pin(&self) -> EpochPin { + let (slot, epoch) = self + .global + .pin_internal(self.last_slot.load(Ordering::Relaxed)); + self.last_slot.store(slot, Ordering::Relaxed); + EpochPin { + handle: self, + epoch, + slot, + } + } } -*/ diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index a9e74da9bd..1cc64a3bce 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -158,6 +158,8 @@ pub struct Tree { root: RootPtr, writer_attached: AtomicBool, + + epoch: epoch::EpochShared, } unsafe impl Sync for Tree {} @@ -183,6 +185,8 @@ where allocator: &'t A, + epoch_handle: epoch::LocalHandle<'t>, + phantom_key: PhantomData, } @@ -194,6 +198,8 @@ where { tree: &'t Tree, + epoch_handle: epoch::LocalHandle<'t>, + phantom_key: PhantomData, } @@ -204,6 +210,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator> TreeInitStruct<'t, K, V, let init = Tree { root: algorithm::new_root(allocator), writer_attached: AtomicBool::new(false), + epoch: epoch::EpochShared::new(), }; unsafe { tree_ptr.write(init) }; @@ -223,6 +230,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator> TreeInitStruct<'t, K, V, tree: self.tree, allocator: self.allocator, phantom_key: PhantomData, + epoch_handle: self.tree.epoch.register(), } } @@ -230,6 +238,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator> TreeInitStruct<'t, K, V, TreeReadAccess { tree: self.tree, phantom_key: PhantomData, + epoch_handle: self.tree.epoch.register(), } } } @@ -240,7 +249,7 @@ impl<'t, K: Key + Clone, V: Value, A: ArtAllocator> TreeWriteAccess<'t, K, V, TreeWriteGuard { allocator: self.allocator, tree: &self.tree, - epoch_pin: epoch::pin_epoch(), + epoch_pin: self.epoch_handle.pin(), phantom_key: PhantomData, } } @@ -248,7 +257,7 @@ impl<'t, K: Key + Clone, V: Value, A: ArtAllocator> TreeWriteAccess<'t, K, V, pub fn start_read(&'t self) -> TreeReadGuard<'t, K, V> { TreeReadGuard { tree: &self.tree, - epoch_pin: epoch::pin_epoch(), + epoch_pin: self.epoch_handle.pin(), phantom_key: PhantomData, } } @@ -258,38 +267,38 @@ impl<'t, K: Key + Clone, V: Value> TreeReadAccess<'t, K, V> { pub fn start_read(&'t self) -> TreeReadGuard<'t, K, V> { TreeReadGuard { tree: &self.tree, - epoch_pin: epoch::pin_epoch(), + epoch_pin: self.epoch_handle.pin(), phantom_key: PhantomData, } } } -pub struct TreeReadGuard<'t, K, V> +pub struct TreeReadGuard<'e, K, V> where K: Key, V: Value, { - tree: &'t Tree, + tree: &'e Tree, - epoch_pin: EpochPin, + epoch_pin: EpochPin<'e>, phantom_key: PhantomData, } -impl<'t, K: Key, V: Value> TreeReadGuard<'t, K, V> { +impl<'e, K: Key, V: Value> TreeReadGuard<'e, K, V> { pub fn get(&self, key: &K) -> Option { algorithm::search(key, self.tree.root, &self.epoch_pin) } } -pub struct TreeWriteGuard<'t, K, V, A> +pub struct TreeWriteGuard<'e, K, V, A> where K: Key, V: Value, { - tree: &'t Tree, - allocator: &'t A, + tree: &'e Tree, + allocator: &'e A, - epoch_pin: EpochPin, + epoch_pin: EpochPin<'e>, phantom_key: PhantomData, }