diff --git a/libs/neonart/src/algorithm.rs b/libs/neonart/src/algorithm.rs index eb580f7722..93c7d9274b 100644 --- a/libs/neonart/src/algorithm.rs +++ b/libs/neonart/src/algorithm.rs @@ -4,17 +4,44 @@ mod node_ref; use std::vec::Vec; -use crate::algorithm::lock_and_version::ResultOrRestart; -use crate::algorithm::node_ptr::{MAX_PREFIX_LEN, NodePtr}; +use crate::algorithm::lock_and_version::ConcurrentUpdateError; +use crate::algorithm::node_ptr::MAX_PREFIX_LEN; use crate::algorithm::node_ref::ChildOrValue; -use crate::algorithm::node_ref::{NodeRef, ReadLockedNodeRef, WriteLockedNodeRef}; +use crate::algorithm::node_ref::{NewNodeRef, NodeRef, ReadLockedNodeRef, WriteLockedNodeRef}; +use crate::allocator::OutOfMemoryError; +use crate::GarbageQueueFullError; +use crate::TreeWriteGuard; use crate::allocator::ArtAllocator; use crate::epoch::EpochPin; use crate::{Key, Value}; pub(crate) type RootPtr = node_ptr::NodePtr; +pub enum ArtError { + ConcurrentUpdate, // need to retry + OutOfMemory, + GarbageQueueFull, +} + +impl From for ArtError { + fn from(_: ConcurrentUpdateError) -> ArtError { + ArtError::ConcurrentUpdate + } +} + +impl From for ArtError { + fn from(_: OutOfMemoryError) -> ArtError { + ArtError::OutOfMemory + } +} + +impl From for ArtError { + fn from(_: GarbageQueueFullError) -> ArtError { + ArtError::GarbageQueueFull + } +} + pub fn new_root(allocator: &impl ArtAllocator) -> RootPtr { node_ptr::new_root(allocator) } @@ -33,12 +60,11 @@ pub(crate) fn search<'e, K: Key, V: Value>( } } -pub(crate) fn update_fn<'e, K: Key, V: Value, F>( +pub(crate) fn update_fn<'e, K: Key, V: Value, A: ArtAllocator, F>( key: &K, value_fn: F, root: RootPtr, - allocator: &impl ArtAllocator, - epoch_pin: &'e EpochPin, + guard: &'e TreeWriteGuard, ) where F: FnOnce(Option<&V>) -> Option, { @@ -52,8 +78,7 @@ pub(crate) fn update_fn<'e, K: Key, V: Value, F>( this_value_fn, root_ref, None, - allocator, - epoch_pin, + guard, 0, key_bytes, ) { @@ -77,7 +102,7 @@ fn lookup_recurse<'e, V: Value>( node: NodeRef<'e, V>, parent: Option>, epoch_pin: &'e EpochPin, -) -> ResultOrRestart> { +) -> Result, ConcurrentUpdateError> { let rnode = node.read_lock_or_restart()?; if let Some(parent) = parent { parent.read_unlock_or_restart()?; @@ -107,16 +132,15 @@ fn lookup_recurse<'e, V: Value>( } // This corresponds to the 'insertOpt' function in the paper -pub(crate) fn update_recurse<'e, V: Value, F>( +pub(crate) fn update_recurse<'e, K: Key, V: Value, A: ArtAllocator, F>( key: &[u8], value_fn: F, node: NodeRef<'e, V>, rparent: Option<(ReadLockedNodeRef, u8)>, - allocator: &impl ArtAllocator, - epoch_pin: &'e EpochPin, + guard: &'e TreeWriteGuard, level: usize, orig_key: &[u8], -) -> ResultOrRestart<()> +) -> Result<(), ArtError> where F: FnOnce(Option<&V>) -> Option, { @@ -129,14 +153,7 @@ where let mut wnode = rnode.upgrade_to_write_lock_or_restart()?; if let Some(new_value) = value_fn(None) { - insert_split_prefix( - key, - new_value, - &mut wnode, - &mut wparent, - parent_key, - allocator, - ); + insert_split_prefix(key, new_value, &mut wnode, &mut wparent, parent_key, guard)?; } wnode.write_unlock(); wparent.write_unlock(); @@ -155,7 +172,7 @@ where let wnode = rnode.upgrade_to_write_lock_or_restart()?; if let Some(new_value) = value_fn(None) { - insert_and_grow(key, new_value, &wnode, &mut wparent, parent_key, allocator); + insert_and_grow(key, new_value, &wnode, &mut wparent, parent_key, guard)?; wnode.write_unlock_obsolete(); wparent.write_unlock(); } else { @@ -168,7 +185,7 @@ where rparent.read_unlock_or_restart()?; } if let Some(new_value) = value_fn(None) { - insert_to_node(&mut wnode, key, new_value, allocator); + insert_to_node(&mut wnode, key, new_value, guard)?; } wnode.write_unlock(); } @@ -203,8 +220,7 @@ where value_fn, next_child, Some((rnode, key[0])), - allocator, - epoch_pin, + guard, level + 1, orig_key, ) @@ -233,7 +249,7 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>( node: NodeRef<'e, V>, epoch_pin: &'e EpochPin, level: usize, -) -> ResultOrRestart<()> { +) -> Result<(), ConcurrentUpdateError> { let indent = str::repeat(" ", level); let rnode = node.read_lock_or_restart()?; @@ -278,81 +294,92 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>( /// [foo]b -> [a]r -> value /// e -> [ls]e -> value ///``` -fn insert_split_prefix<'a, V: Value>( +fn insert_split_prefix<'e, K: Key, V: Value, A: ArtAllocator>( key: &[u8], value: V, node: &mut WriteLockedNodeRef, parent: &mut WriteLockedNodeRef, parent_key: u8, - allocator: &impl ArtAllocator, -) { + guard: &'e TreeWriteGuard, +) -> Result<(), OutOfMemoryError> { let old_node = node; let old_prefix = old_node.get_prefix(); let common_prefix_len = common_prefix(key, old_prefix); // Allocate a node for the new value. - let new_value_node = allocate_node_for_value(&key[common_prefix_len + 1..], value, allocator); + let new_value_node = + allocate_node_for_value(&key[common_prefix_len + 1..], value, guard.allocator)?; // Allocate a new internal node with the common prefix - let mut prefix_node = node_ref::new_internal(&key[..common_prefix_len], allocator); + // FIXME: deallocate 'new_value_node' on OOM + let mut prefix_node = node_ref::new_internal(&key[..common_prefix_len], guard.allocator)?; // Add the old node and the new nodes to the new internal node - prefix_node.insert_child(old_prefix[common_prefix_len], old_node.as_ptr()); - prefix_node.insert_child(key[common_prefix_len], new_value_node); + prefix_node.insert_old_child(old_prefix[common_prefix_len], old_node); + prefix_node.insert_new_child(key[common_prefix_len], new_value_node); // Modify the prefix of the old child in place old_node.truncate_prefix(old_prefix.len() - common_prefix_len - 1); // replace the pointer in the parent parent.replace_child(parent_key, prefix_node.into_ptr()); + + Ok(()) } -fn insert_to_node( +fn insert_to_node<'e, K: Key, V: Value, A: ArtAllocator>( wnode: &mut WriteLockedNodeRef, key: &[u8], value: V, - allocator: &impl ArtAllocator, -) { + guard: &'e TreeWriteGuard, +) -> Result<(), OutOfMemoryError> { if wnode.is_leaf() { wnode.insert_value(key[0], value); } else { - let value_child = allocate_node_for_value(&key[1..], value, allocator); - wnode.insert_child(key[0], value_child); + let value_child = allocate_node_for_value(&key[1..], value, guard.allocator)?; + wnode.insert_child(key[0], value_child.into_ptr()); } + Ok(()) } // On entry: 'parent' and 'node' are locked -fn insert_and_grow( +fn insert_and_grow<'e, K: Key, V: Value, A: ArtAllocator>( key: &[u8], value: V, wnode: &WriteLockedNodeRef, parent: &mut WriteLockedNodeRef, parent_key_byte: u8, - allocator: &impl ArtAllocator, -) { - let mut bigger_node = wnode.grow(allocator); + guard: &'e TreeWriteGuard, +) -> Result<(), ArtError> { + let mut bigger_node = wnode.grow(guard.allocator)?; if wnode.is_leaf() { bigger_node.insert_value(key[0], value); } else { - let value_child = allocate_node_for_value(&key[1..], value, allocator); - bigger_node.insert_child(key[0], value_child); + // FIXME: deallocate 'bigger_node' on OOM + let value_child = allocate_node_for_value(&key[1..], value, guard.allocator)?; + bigger_node.insert_new_child(key[0], value_child); } // Replace the pointer in the parent parent.replace_child(parent_key_byte, bigger_node.into_ptr()); + + // FIXME: if this errors out, deallocate stuff we already allocated + guard.remember_obsolete_node(wnode.as_ptr())?; + + Ok(()) } // Allocate a new leaf node to hold 'value'. If key is long, we may need to allocate // new internal nodes to hold it too -fn allocate_node_for_value( +fn allocate_node_for_value<'a, V: Value, A: ArtAllocator>( key: &[u8], value: V, - allocator: &impl ArtAllocator, -) -> NodePtr { + allocator: &'a A, +) -> Result, OutOfMemoryError> { let mut prefix_off = key.len().saturating_sub(MAX_PREFIX_LEN + 1); - let mut leaf_node = node_ref::new_leaf(&key[prefix_off..key.len() - 1], allocator); + let mut leaf_node = node_ref::new_leaf(&key[prefix_off..key.len() - 1], allocator)?; leaf_node.insert_value(*key.last().unwrap(), value); let mut node = leaf_node; @@ -364,12 +391,12 @@ fn allocate_node_for_value( let mut internal_node = node_ref::new_internal( &remain_prefix[prefix_off..remain_prefix.len() - 1], allocator, - ); - internal_node.insert_child(*remain_prefix.last().unwrap(), node.into_ptr()); + )?; + internal_node.insert_new_child(*remain_prefix.last().unwrap(), node); node = internal_node; } - node.into_ptr() + Ok(node) } fn common_prefix(a: &[u8], b: &[u8]) -> usize { diff --git a/libs/neonart/src/algorithm/lock_and_version.rs b/libs/neonart/src/algorithm/lock_and_version.rs index 94117cd531..d3829dbea4 100644 --- a/libs/neonart/src/algorithm/lock_and_version.rs +++ b/libs/neonart/src/algorithm/lock_and_version.rs @@ -1,5 +1,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; +pub(crate) struct ConcurrentUpdateError(); + pub(crate) struct AtomicLockAndVersion { inner: AtomicU64, } @@ -12,33 +14,30 @@ impl AtomicLockAndVersion { } } -pub(crate) type ResultOrRestart = Result; - -const fn restart() -> ResultOrRestart { - Err(()) -} - impl AtomicLockAndVersion { - pub(crate) fn read_lock_or_restart(&self) -> ResultOrRestart { + pub(crate) fn read_lock_or_restart(&self) -> Result { let version = self.await_node_unlocked(); if is_obsolete(version) { - return restart(); + return Err(ConcurrentUpdateError()); } Ok(version) } - pub(crate) fn check_or_restart(&self, version: u64) -> ResultOrRestart<()> { + pub(crate) fn check_or_restart(&self, version: u64) -> Result<(), ConcurrentUpdateError> { self.read_unlock_or_restart(version) } - pub(crate) fn read_unlock_or_restart(&self, version: u64) -> ResultOrRestart<()> { + pub(crate) fn read_unlock_or_restart(&self, version: u64) -> Result<(), ConcurrentUpdateError> { if self.inner.load(Ordering::Acquire) != version { - return restart(); + return Err(ConcurrentUpdateError()); } Ok(()) } - pub(crate) fn upgrade_to_write_lock_or_restart(&self, version: u64) -> ResultOrRestart<()> { + pub(crate) fn upgrade_to_write_lock_or_restart( + &self, + version: u64, + ) -> Result<(), ConcurrentUpdateError> { if self .inner .compare_exchange( @@ -49,7 +48,7 @@ impl AtomicLockAndVersion { ) .is_err() { - return restart(); + return Err(ConcurrentUpdateError()); } Ok(()) } diff --git a/libs/neonart/src/algorithm/node_ptr.rs b/libs/neonart/src/algorithm/node_ptr.rs index 9e0178a275..2f849323a0 100644 --- a/libs/neonart/src/algorithm/node_ptr.rs +++ b/libs/neonart/src/algorithm/node_ptr.rs @@ -404,21 +404,18 @@ impl NodePtr { } } - // FIXME - /* - pub(crate) fn deallocate(self, allocator: &impl ArtAllocator) { - match self.variant() { - NodeVariant::Internal4(_) => allocator.dealloc_node_internal4(self.ptr.cast()), - NodeVariant::Internal16(_) => allocator.dealloc_node_internal16(self.ptr.cast()), - NodeVariant::Internal48(_) => allocator.dealloc_node_internal48(self.ptr.cast()), - NodeVariant::Internal256(_) => allocator.dealloc_node_internal256(self.ptr.cast()), - NodeVariant::Leaf4(_) => allocator.dealloc_node_leaf4(self.ptr.cast()), - NodeVariant::Leaf16(_) => allocator.dealloc_node_leaf16(self.ptr.cast()), - NodeVariant::Leaf48(_) => allocator.dealloc_node_leaf48(self.ptr.cast()), - NodeVariant::Leaf256(_) => allocator.dealloc_node_leaf256(self.ptr.cast()), - } + pub(crate) fn deallocate(self, allocator: &impl ArtAllocator) { + match self.variant() { + NodeVariant::Internal4(_) => allocator.dealloc_node_internal4(self.ptr.cast()), + NodeVariant::Internal16(_) => allocator.dealloc_node_internal16(self.ptr.cast()), + NodeVariant::Internal48(_) => allocator.dealloc_node_internal48(self.ptr.cast()), + NodeVariant::Internal256(_) => allocator.dealloc_node_internal256(self.ptr.cast()), + NodeVariant::Leaf4(_) => allocator.dealloc_node_leaf4(self.ptr.cast()), + NodeVariant::Leaf16(_) => allocator.dealloc_node_leaf16(self.ptr.cast()), + NodeVariant::Leaf48(_) => allocator.dealloc_node_leaf48(self.ptr.cast()), + NodeVariant::Leaf256(_) => allocator.dealloc_node_leaf256(self.ptr.cast()), + } } - */ } pub fn new_root(allocator: &impl ArtAllocator) -> NodePtr { diff --git a/libs/neonart/src/algorithm/node_ref.rs b/libs/neonart/src/algorithm/node_ref.rs index 282f979f8f..1e92e283d3 100644 --- a/libs/neonart/src/algorithm/node_ref.rs +++ b/libs/neonart/src/algorithm/node_ref.rs @@ -1,14 +1,15 @@ use std::fmt::Debug; use std::marker::PhantomData; -use super::lock_and_version::ResultOrRestart; use super::node_ptr; use super::node_ptr::ChildOrValuePtr; use super::node_ptr::NodePtr; use crate::EpochPin; use crate::Value; use crate::algorithm::lock_and_version::AtomicLockAndVersion; +use crate::algorithm::lock_and_version::ConcurrentUpdateError; use crate::allocator::ArtAllocator; +use crate::allocator::OutOfMemoryError; pub struct NodeRef<'e, V> { ptr: NodePtr, @@ -30,7 +31,9 @@ impl<'e, V: Value> NodeRef<'e, V> { } } - pub(crate) fn read_lock_or_restart(&self) -> ResultOrRestart> { + pub(crate) fn read_lock_or_restart( + &self, + ) -> Result, ConcurrentUpdateError> { let version = self.lockword().read_lock_or_restart()?; Ok(ReadLockedNodeRef { ptr: self.ptr, @@ -78,7 +81,7 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> { pub(crate) fn find_child_or_value_or_restart( &self, key_byte: u8, - ) -> ResultOrRestart>> { + ) -> Result>, ConcurrentUpdateError> { let child_or_value = self.ptr.find_child_or_value(key_byte); self.ptr.lockword().check_or_restart(self.version)?; @@ -94,7 +97,7 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> { pub(crate) fn upgrade_to_write_lock_or_restart( self, - ) -> ResultOrRestart> { + ) -> Result, ConcurrentUpdateError> { self.ptr .lockword() .upgrade_to_write_lock_or_restart(self.version)?; @@ -105,7 +108,7 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> { }) } - pub(crate) fn read_unlock_or_restart(self) -> ResultOrRestart<()> { + pub(crate) fn read_unlock_or_restart(self) -> Result<(), ConcurrentUpdateError> { self.ptr.lockword().check_or_restart(self.version)?; Ok(()) } @@ -149,9 +152,20 @@ impl<'e, V: Value> WriteLockedNodeRef<'e, V> { self.ptr.insert_value(key_byte, value) } - pub(crate) fn grow(&self, allocator: &impl ArtAllocator) -> NewNodeRef { + pub(crate) fn grow<'a, A>( + &self, + allocator: &'a A, + ) -> Result, OutOfMemoryError> + where + A: ArtAllocator, + { + // FIXME: check OOM let new_node = self.ptr.grow(allocator); - NewNodeRef { ptr: new_node } + Ok(NewNodeRef { + ptr: new_node, + allocator, + extra_nodes: Vec::new(), + }) } pub(crate) fn as_ptr(&self) -> NodePtr { @@ -171,36 +185,85 @@ impl<'e, V> Drop for WriteLockedNodeRef<'e, V> { } } -pub(crate) struct NewNodeRef { +pub(crate) struct NewNodeRef<'a, V, A> +where + V: Value, + A: ArtAllocator, +{ ptr: NodePtr, + allocator: &'a A, + + extra_nodes: Vec>, } -impl NewNodeRef { - pub(crate) fn insert_child(&mut self, key_byte: u8, child: NodePtr) { - self.ptr.insert_child(key_byte, child) +impl<'a, V, A> NewNodeRef<'a, V, A> +where + V: Value, + A: ArtAllocator, +{ + pub(crate) fn insert_old_child(&mut self, key_byte: u8, child: &WriteLockedNodeRef) { + self.ptr.insert_child(key_byte, child.as_ptr()) } pub(crate) fn insert_value(&mut self, key_byte: u8, value: V) { self.ptr.insert_value(key_byte, value) } - pub(crate) fn into_ptr(self) -> NodePtr { + pub(crate) fn into_ptr(mut self) -> NodePtr { let ptr = self.ptr; + self.ptr = NodePtr::null(); ptr } + + pub(crate) fn insert_new_child(&mut self, key_byte: u8, child: NewNodeRef<'a, V, A>) { + let child_ptr = child.into_ptr(); + self.ptr.insert_child(key_byte, child_ptr); + self.extra_nodes.push(child_ptr); + } } -pub(crate) fn new_internal( +impl<'a, V, A> Drop for NewNodeRef<'a, V, A> +where + V: Value, + A: ArtAllocator, +{ + /// This drop implementation deallocates the newly allocated node, if into_ptr() was not called. + fn drop(&mut self) { + if !self.ptr.is_null() { + self.ptr.deallocate(self.allocator); + for p in self.extra_nodes.iter() { + p.deallocate(self.allocator); + } + } + } +} + +pub(crate) fn new_internal<'a, V, A>( prefix: &[u8], - allocator: &impl ArtAllocator, -) -> NewNodeRef { - NewNodeRef { + allocator: &'a A, +) -> Result, OutOfMemoryError> +where + V: Value, + A: ArtAllocator, +{ + Ok(NewNodeRef { ptr: node_ptr::new_internal(prefix, allocator), - } + allocator, + extra_nodes: Vec::new(), + }) } -pub(crate) fn new_leaf(prefix: &[u8], allocator: &impl ArtAllocator) -> NewNodeRef { - NewNodeRef { +pub(crate) fn new_leaf<'a, V, A>( + prefix: &[u8], + allocator: &'a A, +) -> Result, OutOfMemoryError> +where + V: Value, + A: ArtAllocator, +{ + Ok(NewNodeRef { ptr: node_ptr::new_leaf(prefix, allocator), - } + allocator, + extra_nodes: Vec::new(), + }) } diff --git a/libs/neonart/src/allocator.rs b/libs/neonart/src/allocator.rs index 1b3ba51cfb..641f8f2a29 100644 --- a/libs/neonart/src/allocator.rs +++ b/libs/neonart/src/allocator.rs @@ -15,6 +15,8 @@ pub use crate::algorithm::node_ptr::{ NodeLeaf48, NodeLeaf256, }; +pub struct OutOfMemoryError(); + pub trait ArtAllocator { fn alloc_tree(&self) -> *mut Tree; diff --git a/libs/neonart/src/epoch.rs b/libs/neonart/src/epoch.rs index 1fce1fbb0b..edea3e973d 100644 --- a/libs/neonart/src/epoch.rs +++ b/libs/neonart/src/epoch.rs @@ -59,7 +59,7 @@ impl EpochShared { (slot, epoch) } - fn advance(&self) -> u64 { + pub(crate) fn advance(&self) -> u64 { // Advance the global epoch let old_epoch = self.global_epoch.fetch_add(2, Ordering::Relaxed); let new_epoch = old_epoch + 2; @@ -68,7 +68,7 @@ impl EpochShared { new_epoch } - fn broadcast(&self) { + pub(crate) fn broadcast(&self) { let Some(_guard) = self.broadcast_lock.try_lock() else { return; }; @@ -90,7 +90,7 @@ impl EpochShared { // FIXME: memory fence here, since we used Relaxed? } - fn get_oldest(&self) -> u64 { + pub(crate) fn get_oldest(&self) -> u64 { // Read all slots. let now = self.global_epoch.load(Ordering::Relaxed); let mut oldest = now; @@ -111,7 +111,7 @@ impl EpochShared { pub(crate) struct EpochPin<'e> { slot: usize, - epoch: u64, + pub(crate) epoch: u64, handle: &'e LocalHandle<'e>, } diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index 1cc64a3bce..a3c4b879c5 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -126,6 +126,7 @@ pub mod allocator; mod epoch; use algorithm::RootPtr; +use algorithm::node_ptr::NodePtr; use std::fmt::Debug; use std::marker::PhantomData; @@ -154,17 +155,65 @@ pub trait Key: Clone + Debug { /// the old sticks around until all readers that might see the old value are gone. pub trait Value: Clone {} +const MAX_GARBAGE: usize = 1024; + pub struct Tree { root: RootPtr, writer_attached: AtomicBool, epoch: epoch::EpochShared, + + garbage: spin::Mutex>, } unsafe impl Sync for Tree {} unsafe impl Send for Tree {} +struct GarbageQueueFullError(); + +struct GarbageQueue { + slots: [(NodePtr, u64); MAX_GARBAGE], + front: usize, + back: usize, +} +impl GarbageQueue { + fn new() -> GarbageQueue { + GarbageQueue { + slots: [const { (NodePtr::null(), 0) }; MAX_GARBAGE], + front: 0, + back: 0, + } + } + + fn remember_obsolete_node( + &mut self, + ptr: NodePtr, + epoch: u64, + ) -> Result<(), GarbageQueueFullError> { + if self.front == self.back.wrapping_add(MAX_GARBAGE) { + return Err(GarbageQueueFullError()); + } + + self.slots[self.front % MAX_GARBAGE] = (ptr, epoch); + self.front = self.front.wrapping_add(1); + Ok(()) + } + + fn next_obsolete(&mut self, cutoff_epoch: u64) -> Option> { + if self.front == self.back { + return None; + } + let slot = &self.slots[self.back % MAX_GARBAGE]; + // FIXME: performing wrapping comparison + if slot.1 < cutoff_epoch { + self.back += 1; + return Some(slot.0); + } + None + } +} + /// Struct created at postmaster startup pub struct TreeInitStruct<'t, K: Key, V: Value, A: ArtAllocator> { tree: &'t Tree, @@ -211,6 +260,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator> TreeInitStruct<'t, K, V, root: algorithm::new_root(allocator), writer_attached: AtomicBool::new(false), epoch: epoch::EpochShared::new(), + garbage: spin::Mutex::new(GarbageQueue::new()), }; unsafe { tree_ptr.write(init) }; @@ -261,6 +311,18 @@ impl<'t, K: Key + Clone, V: Value, A: ArtAllocator> TreeWriteAccess<'t, K, V, phantom_key: PhantomData, } } + + pub fn collect_garbage(&'t self) { + self.tree.epoch.advance(); + self.tree.epoch.broadcast(); + + let cutoff_epoch = self.tree.epoch.get_oldest(); + + let mut garbage_queue = self.tree.garbage.lock(); + while let Some(ptr) = garbage_queue.next_obsolete(cutoff_epoch) { + ptr.deallocate(self.allocator); + } + } } impl<'t, K: Key + Clone, V: Value> TreeReadAccess<'t, K, V> { @@ -311,18 +373,19 @@ impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'t, K, V, A> { where F: FnOnce(Option<&V>) -> Option, { - algorithm::update_fn( - key, - value_fn, - self.tree.root, - self.allocator, - &self.epoch_pin, - ) + algorithm::update_fn(key, value_fn, self.tree.root, self) } pub fn get(&mut self, key: &K) -> Option { algorithm::search(key, self.tree.root, &self.epoch_pin) } + + fn remember_obsolete_node(&'t self, ptr: NodePtr) -> Result<(), GarbageQueueFullError> { + self.tree + .garbage + .lock() + .remember_obsolete_node(ptr, self.epoch_pin.epoch) + } } impl<'t, K: Key, V: Value + Debug> TreeReadGuard<'t, K, V> {