Implement epoch system

This commit is contained in:
Heikki Linnakangas
2025-05-02 08:52:05 +03:00
parent 54cd2272f1
commit 083118e98e
7 changed files with 161 additions and 92 deletions

5
Cargo.lock generated
View File

@@ -1689,9 +1689,9 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.19"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "crossterm"
@@ -3905,6 +3905,7 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
name = "neonart"
version = "0.1.0"
dependencies = [
"crossbeam-utils",
"rand 0.8.5",
"spin",
"tracing",

View File

@@ -87,6 +87,7 @@ clap = { version = "4.0", features = ["derive", "env"] }
clashmap = { version = "1.0", features = ["raw-api"] }
comfy-table = "7.1"
const_format = "0.2"
crossbeam-utils = "0.8.21"
crc32c = "0.6"
diatomic-waker = { version = "0.2.3" }
either = "1.8"

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
crossbeam-utils.workspace = true
spin.workspace = true
tracing.workspace = true

View File

@@ -13,7 +13,7 @@ use crate::allocator::ArtAllocator;
pub struct NodeRef<'e, V> {
ptr: NodePtr<V>,
phantom: PhantomData<&'e EpochPin>,
phantom: PhantomData<&'e EpochPin<'e>>,
}
impl<'e, V> Debug for NodeRef<'e, V> {
@@ -50,7 +50,7 @@ pub struct ReadLockedNodeRef<'e, V> {
ptr: NodePtr<V>,
version: u64,
phantom: PhantomData<&'e EpochPin>,
phantom: PhantomData<&'e EpochPin<'e>>,
}
pub(crate) enum ChildOrValue<'e, V> {
@@ -115,7 +115,7 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> {
/// the version after each read.
pub struct WriteLockedNodeRef<'e, V> {
ptr: NodePtr<V>,
phantom: PhantomData<&'e EpochPin>,
phantom: PhantomData<&'e EpochPin<'e>>,
}
impl<'e, V: Value> WriteLockedNodeRef<'e, V> {

View File

@@ -123,66 +123,3 @@ impl<'t, V: crate::Value> ArtAllocator<V> for ArtMultiSlabAllocator<'t, V> {
self.inner.dealloc_slab(7, ptr.cast())
}
}
/*
pub struct Allocator {
area: *mut MaybeUninit<u8>,
allocated: AtomicUsize,
size: usize,
}
const MAXALIGN: usize = std::mem::align_of::<usize>();
impl Allocator {
pub fn new_uninit(area: &'static mut [MaybeUninit<u8>]) -> Allocator {
let ptr = area.as_mut_ptr();
let size = area.len();
Self::new_from_ptr(ptr, size)
}
pub fn new(area: &'static mut [u8]) -> Allocator {
let ptr: *mut MaybeUninit<u8> = area.as_mut_ptr().cast();
let size = area.len();
Self::new_from_ptr(ptr, size)
}
pub fn new_from_ptr(ptr: *mut MaybeUninit<u8>, size: usize) -> Allocator {
let padding = ptr.align_offset(MAXALIGN);
Allocator {
area: ptr,
allocated: AtomicUsize::new(padding),
size,
}
}
pub fn alloc<'a, T: Sized>(&'a self, value: T) -> AllocatedBox<'a, T> {
let sz = std::mem::size_of::<T>();
// pad all allocations to MAXALIGN boundaries
assert!(std::mem::align_of::<T>() <= MAXALIGN);
let sz = sz.next_multiple_of(MAXALIGN);
let offset = self.allocated.fetch_add(sz, Ordering::Relaxed);
if offset + sz > self.size {
panic!("out of memory");
}
let inner = unsafe {
let inner = self.area.offset(offset as isize).cast::<T>();
*inner = value;
NonNull::new_unchecked(inner)
};
AllocatedBox {
inner,
_phantom: PhantomData,
}
}
pub fn _dealloc_node<T>(&self, _node: AllocatedBox<T>) {
// doesn't free it immediately.
}
}
*/

View File

@@ -1,23 +1,143 @@
//! This is similar to crossbeam_epoch crate, but works in shared memory
//!
//! FIXME: not implemented yet. (We haven't implemented removing any nodes from the ART
//! tree, which is why we get away without this now)
pub(crate) struct EpochPin {}
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
pub(crate) fn pin_epoch() -> EpochPin {
EpochPin {}
use crossbeam_utils::CachePadded;
use spin;
const NUM_SLOTS: usize = 1000;
/// This is the struct that is stored in shmem
///
/// bit 0: is it pinned or not?
/// rest of the bits are the epoch counter.
pub struct EpochShared {
global_epoch: AtomicU64,
participants: [CachePadded<AtomicU64>; NUM_SLOTS],
broadcast_lock: spin::Mutex<()>,
}
/*
struct CollectorGlobal {
epoch: AtomicU64,
impl EpochShared {
pub fn new() -> EpochShared {
EpochShared {
global_epoch: AtomicU64::new(2),
participants: [const { CachePadded::new(AtomicU64::new(2)) }; NUM_SLOTS],
broadcast_lock: spin::Mutex::new(()),
}
}
participants: CachePadded<AtomicU64>, // make it an array
pub fn register(&self) -> LocalHandle {
LocalHandle {
global: self,
last_slot: AtomicUsize::new(0), // todo: choose more intelligently
}
}
fn release_pin(&self, slot: usize, _epoch: u64) {
let global_epoch = self.global_epoch.load(Ordering::Relaxed);
self.participants[slot].store(global_epoch, Ordering::Relaxed);
}
fn pin_internal(&self, slot_hint: usize) -> (usize, u64) {
// pick a slot
let mut slot = slot_hint;
let epoch = loop {
let old = self.participants[slot].fetch_or(1, Ordering::Relaxed);
if old & 1 == 0 {
// Got this slot
break old;
}
// the slot was busy by another thread / process. try a different slot
slot += 1;
if slot == NUM_SLOTS {
slot = 0;
}
continue;
};
(slot, epoch)
}
fn advance(&self) -> u64 {
// Advance the global epoch
let old_epoch = self.global_epoch.fetch_add(2, Ordering::Relaxed);
let new_epoch = old_epoch + 2;
// Anyone that release their pin after this will update their slot.
new_epoch
}
fn broadcast(&self) {
let Some(_guard) = self.broadcast_lock.try_lock() else {
return;
};
let epoch = self.global_epoch.load(Ordering::Relaxed);
let old_epoch = epoch.wrapping_sub(2);
// Update all free slots.
for i in 0..NUM_SLOTS {
// TODO: check result, as a sanity check. It should either be the old epoch, or pinned
let _ = self.participants[i].compare_exchange(
old_epoch,
epoch,
Ordering::Relaxed,
Ordering::Relaxed,
);
}
// FIXME: memory fence here, since we used Relaxed?
}
fn get_oldest(&self) -> u64 {
// Read all slots.
let now = self.global_epoch.load(Ordering::Relaxed);
let mut oldest = now;
for i in 0..NUM_SLOTS {
let this_epoch = self.participants[i].load(Ordering::Relaxed);
let delta = now.wrapping_sub(this_epoch);
if delta > u64::MAX / 2 {
// this is very recent
} else {
if delta > now.wrapping_sub(oldest) {
oldest = this_epoch;
}
}
}
oldest
}
}
pub(crate) struct EpochPin<'e> {
slot: usize,
epoch: u64,
struct CollectorQueue {
handle: &'e LocalHandle<'e>,
}
impl<'e> Drop for EpochPin<'e> {
fn drop(&mut self) {
self.handle.global.release_pin(self.slot, self.epoch);
}
}
pub struct LocalHandle<'g> {
global: &'g EpochShared,
last_slot: AtomicUsize,
}
impl<'g> LocalHandle<'g> {
pub fn pin(&self) -> EpochPin {
let (slot, epoch) = self
.global
.pin_internal(self.last_slot.load(Ordering::Relaxed));
self.last_slot.store(slot, Ordering::Relaxed);
EpochPin {
handle: self,
epoch,
slot,
}
}
}
*/

View File

@@ -158,6 +158,8 @@ pub struct Tree<V: Value> {
root: RootPtr<V>,
writer_attached: AtomicBool,
epoch: epoch::EpochShared,
}
unsafe impl<V: Value + Sync> Sync for Tree<V> {}
@@ -183,6 +185,8 @@ where
allocator: &'t A,
epoch_handle: epoch::LocalHandle<'t>,
phantom_key: PhantomData<K>,
}
@@ -194,6 +198,8 @@ where
{
tree: &'t Tree<V>,
epoch_handle: epoch::LocalHandle<'t>,
phantom_key: PhantomData<K>,
}
@@ -204,6 +210,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator<V>> TreeInitStruct<'t, K, V,
let init = Tree {
root: algorithm::new_root(allocator),
writer_attached: AtomicBool::new(false),
epoch: epoch::EpochShared::new(),
};
unsafe { tree_ptr.write(init) };
@@ -223,6 +230,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator<V>> TreeInitStruct<'t, K, V,
tree: self.tree,
allocator: self.allocator,
phantom_key: PhantomData,
epoch_handle: self.tree.epoch.register(),
}
}
@@ -230,6 +238,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator<V>> TreeInitStruct<'t, K, V,
TreeReadAccess {
tree: self.tree,
phantom_key: PhantomData,
epoch_handle: self.tree.epoch.register(),
}
}
}
@@ -240,7 +249,7 @@ impl<'t, K: Key + Clone, V: Value, A: ArtAllocator<V>> TreeWriteAccess<'t, K, V,
TreeWriteGuard {
allocator: self.allocator,
tree: &self.tree,
epoch_pin: epoch::pin_epoch(),
epoch_pin: self.epoch_handle.pin(),
phantom_key: PhantomData,
}
}
@@ -248,7 +257,7 @@ impl<'t, K: Key + Clone, V: Value, A: ArtAllocator<V>> TreeWriteAccess<'t, K, V,
pub fn start_read(&'t self) -> TreeReadGuard<'t, K, V> {
TreeReadGuard {
tree: &self.tree,
epoch_pin: epoch::pin_epoch(),
epoch_pin: self.epoch_handle.pin(),
phantom_key: PhantomData,
}
}
@@ -258,38 +267,38 @@ impl<'t, K: Key + Clone, V: Value> TreeReadAccess<'t, K, V> {
pub fn start_read(&'t self) -> TreeReadGuard<'t, K, V> {
TreeReadGuard {
tree: &self.tree,
epoch_pin: epoch::pin_epoch(),
epoch_pin: self.epoch_handle.pin(),
phantom_key: PhantomData,
}
}
}
pub struct TreeReadGuard<'t, K, V>
pub struct TreeReadGuard<'e, K, V>
where
K: Key,
V: Value,
{
tree: &'t Tree<V>,
tree: &'e Tree<V>,
epoch_pin: EpochPin,
epoch_pin: EpochPin<'e>,
phantom_key: PhantomData<K>,
}
impl<'t, K: Key, V: Value> TreeReadGuard<'t, K, V> {
impl<'e, K: Key, V: Value> TreeReadGuard<'e, K, V> {
pub fn get(&self, key: &K) -> Option<V> {
algorithm::search(key, self.tree.root, &self.epoch_pin)
}
}
pub struct TreeWriteGuard<'t, K, V, A>
pub struct TreeWriteGuard<'e, K, V, A>
where
K: Key,
V: Value,
{
tree: &'t Tree<V>,
allocator: &'t A,
tree: &'e Tree<V>,
allocator: &'e A,
epoch_pin: EpochPin,
epoch_pin: EpochPin<'e>,
phantom_key: PhantomData<K>,
}