mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
Collect garbage, handle OOMs
This commit is contained in:
@@ -4,17 +4,44 @@ mod node_ref;
|
||||
|
||||
use std::vec::Vec;
|
||||
|
||||
use crate::algorithm::lock_and_version::ResultOrRestart;
|
||||
use crate::algorithm::node_ptr::{MAX_PREFIX_LEN, NodePtr};
|
||||
use crate::algorithm::lock_and_version::ConcurrentUpdateError;
|
||||
use crate::algorithm::node_ptr::MAX_PREFIX_LEN;
|
||||
use crate::algorithm::node_ref::ChildOrValue;
|
||||
use crate::algorithm::node_ref::{NodeRef, ReadLockedNodeRef, WriteLockedNodeRef};
|
||||
use crate::algorithm::node_ref::{NewNodeRef, NodeRef, ReadLockedNodeRef, WriteLockedNodeRef};
|
||||
use crate::allocator::OutOfMemoryError;
|
||||
|
||||
use crate::GarbageQueueFullError;
|
||||
use crate::TreeWriteGuard;
|
||||
use crate::allocator::ArtAllocator;
|
||||
use crate::epoch::EpochPin;
|
||||
use crate::{Key, Value};
|
||||
|
||||
pub(crate) type RootPtr<V> = node_ptr::NodePtr<V>;
|
||||
|
||||
pub enum ArtError {
|
||||
ConcurrentUpdate, // need to retry
|
||||
OutOfMemory,
|
||||
GarbageQueueFull,
|
||||
}
|
||||
|
||||
impl From<ConcurrentUpdateError> for ArtError {
|
||||
fn from(_: ConcurrentUpdateError) -> ArtError {
|
||||
ArtError::ConcurrentUpdate
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OutOfMemoryError> for ArtError {
|
||||
fn from(_: OutOfMemoryError) -> ArtError {
|
||||
ArtError::OutOfMemory
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GarbageQueueFullError> for ArtError {
|
||||
fn from(_: GarbageQueueFullError) -> ArtError {
|
||||
ArtError::GarbageQueueFull
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_root<V: Value>(allocator: &impl ArtAllocator<V>) -> RootPtr<V> {
|
||||
node_ptr::new_root(allocator)
|
||||
}
|
||||
@@ -33,12 +60,11 @@ pub(crate) fn search<'e, K: Key, V: Value>(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn update_fn<'e, K: Key, V: Value, F>(
|
||||
pub(crate) fn update_fn<'e, K: Key, V: Value, A: ArtAllocator<V>, F>(
|
||||
key: &K,
|
||||
value_fn: F,
|
||||
root: RootPtr<V>,
|
||||
allocator: &impl ArtAllocator<V>,
|
||||
epoch_pin: &'e EpochPin,
|
||||
guard: &'e TreeWriteGuard<K, V, A>,
|
||||
) where
|
||||
F: FnOnce(Option<&V>) -> Option<V>,
|
||||
{
|
||||
@@ -52,8 +78,7 @@ pub(crate) fn update_fn<'e, K: Key, V: Value, F>(
|
||||
this_value_fn,
|
||||
root_ref,
|
||||
None,
|
||||
allocator,
|
||||
epoch_pin,
|
||||
guard,
|
||||
0,
|
||||
key_bytes,
|
||||
) {
|
||||
@@ -77,7 +102,7 @@ fn lookup_recurse<'e, V: Value>(
|
||||
node: NodeRef<'e, V>,
|
||||
parent: Option<ReadLockedNodeRef<V>>,
|
||||
epoch_pin: &'e EpochPin,
|
||||
) -> ResultOrRestart<Option<V>> {
|
||||
) -> Result<Option<V>, ConcurrentUpdateError> {
|
||||
let rnode = node.read_lock_or_restart()?;
|
||||
if let Some(parent) = parent {
|
||||
parent.read_unlock_or_restart()?;
|
||||
@@ -107,16 +132,15 @@ fn lookup_recurse<'e, V: Value>(
|
||||
}
|
||||
|
||||
// This corresponds to the 'insertOpt' function in the paper
|
||||
pub(crate) fn update_recurse<'e, V: Value, F>(
|
||||
pub(crate) fn update_recurse<'e, K: Key, V: Value, A: ArtAllocator<V>, F>(
|
||||
key: &[u8],
|
||||
value_fn: F,
|
||||
node: NodeRef<'e, V>,
|
||||
rparent: Option<(ReadLockedNodeRef<V>, u8)>,
|
||||
allocator: &impl ArtAllocator<V>,
|
||||
epoch_pin: &'e EpochPin,
|
||||
guard: &'e TreeWriteGuard<K, V, A>,
|
||||
level: usize,
|
||||
orig_key: &[u8],
|
||||
) -> ResultOrRestart<()>
|
||||
) -> Result<(), ArtError>
|
||||
where
|
||||
F: FnOnce(Option<&V>) -> Option<V>,
|
||||
{
|
||||
@@ -129,14 +153,7 @@ where
|
||||
let mut wnode = rnode.upgrade_to_write_lock_or_restart()?;
|
||||
|
||||
if let Some(new_value) = value_fn(None) {
|
||||
insert_split_prefix(
|
||||
key,
|
||||
new_value,
|
||||
&mut wnode,
|
||||
&mut wparent,
|
||||
parent_key,
|
||||
allocator,
|
||||
);
|
||||
insert_split_prefix(key, new_value, &mut wnode, &mut wparent, parent_key, guard)?;
|
||||
}
|
||||
wnode.write_unlock();
|
||||
wparent.write_unlock();
|
||||
@@ -155,7 +172,7 @@ where
|
||||
let wnode = rnode.upgrade_to_write_lock_or_restart()?;
|
||||
|
||||
if let Some(new_value) = value_fn(None) {
|
||||
insert_and_grow(key, new_value, &wnode, &mut wparent, parent_key, allocator);
|
||||
insert_and_grow(key, new_value, &wnode, &mut wparent, parent_key, guard)?;
|
||||
wnode.write_unlock_obsolete();
|
||||
wparent.write_unlock();
|
||||
} else {
|
||||
@@ -168,7 +185,7 @@ where
|
||||
rparent.read_unlock_or_restart()?;
|
||||
}
|
||||
if let Some(new_value) = value_fn(None) {
|
||||
insert_to_node(&mut wnode, key, new_value, allocator);
|
||||
insert_to_node(&mut wnode, key, new_value, guard)?;
|
||||
}
|
||||
wnode.write_unlock();
|
||||
}
|
||||
@@ -203,8 +220,7 @@ where
|
||||
value_fn,
|
||||
next_child,
|
||||
Some((rnode, key[0])),
|
||||
allocator,
|
||||
epoch_pin,
|
||||
guard,
|
||||
level + 1,
|
||||
orig_key,
|
||||
)
|
||||
@@ -233,7 +249,7 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>(
|
||||
node: NodeRef<'e, V>,
|
||||
epoch_pin: &'e EpochPin,
|
||||
level: usize,
|
||||
) -> ResultOrRestart<()> {
|
||||
) -> Result<(), ConcurrentUpdateError> {
|
||||
let indent = str::repeat(" ", level);
|
||||
|
||||
let rnode = node.read_lock_or_restart()?;
|
||||
@@ -278,81 +294,92 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>(
|
||||
/// [foo]b -> [a]r -> value
|
||||
/// e -> [ls]e -> value
|
||||
///```
|
||||
fn insert_split_prefix<'a, V: Value>(
|
||||
fn insert_split_prefix<'e, K: Key, V: Value, A: ArtAllocator<V>>(
|
||||
key: &[u8],
|
||||
value: V,
|
||||
node: &mut WriteLockedNodeRef<V>,
|
||||
parent: &mut WriteLockedNodeRef<V>,
|
||||
parent_key: u8,
|
||||
allocator: &impl ArtAllocator<V>,
|
||||
) {
|
||||
guard: &'e TreeWriteGuard<K, V, A>,
|
||||
) -> Result<(), OutOfMemoryError> {
|
||||
let old_node = node;
|
||||
let old_prefix = old_node.get_prefix();
|
||||
let common_prefix_len = common_prefix(key, old_prefix);
|
||||
|
||||
// Allocate a node for the new value.
|
||||
let new_value_node = allocate_node_for_value(&key[common_prefix_len + 1..], value, allocator);
|
||||
let new_value_node =
|
||||
allocate_node_for_value(&key[common_prefix_len + 1..], value, guard.allocator)?;
|
||||
|
||||
// Allocate a new internal node with the common prefix
|
||||
let mut prefix_node = node_ref::new_internal(&key[..common_prefix_len], allocator);
|
||||
// FIXME: deallocate 'new_value_node' on OOM
|
||||
let mut prefix_node = node_ref::new_internal(&key[..common_prefix_len], guard.allocator)?;
|
||||
|
||||
// Add the old node and the new nodes to the new internal node
|
||||
prefix_node.insert_child(old_prefix[common_prefix_len], old_node.as_ptr());
|
||||
prefix_node.insert_child(key[common_prefix_len], new_value_node);
|
||||
prefix_node.insert_old_child(old_prefix[common_prefix_len], old_node);
|
||||
prefix_node.insert_new_child(key[common_prefix_len], new_value_node);
|
||||
|
||||
// Modify the prefix of the old child in place
|
||||
old_node.truncate_prefix(old_prefix.len() - common_prefix_len - 1);
|
||||
|
||||
// replace the pointer in the parent
|
||||
parent.replace_child(parent_key, prefix_node.into_ptr());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn insert_to_node<V: Value>(
|
||||
fn insert_to_node<'e, K: Key, V: Value, A: ArtAllocator<V>>(
|
||||
wnode: &mut WriteLockedNodeRef<V>,
|
||||
key: &[u8],
|
||||
value: V,
|
||||
allocator: &impl ArtAllocator<V>,
|
||||
) {
|
||||
guard: &'e TreeWriteGuard<K, V, A>,
|
||||
) -> Result<(), OutOfMemoryError> {
|
||||
if wnode.is_leaf() {
|
||||
wnode.insert_value(key[0], value);
|
||||
} else {
|
||||
let value_child = allocate_node_for_value(&key[1..], value, allocator);
|
||||
wnode.insert_child(key[0], value_child);
|
||||
let value_child = allocate_node_for_value(&key[1..], value, guard.allocator)?;
|
||||
wnode.insert_child(key[0], value_child.into_ptr());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// On entry: 'parent' and 'node' are locked
|
||||
fn insert_and_grow<V: Value>(
|
||||
fn insert_and_grow<'e, K: Key, V: Value, A: ArtAllocator<V>>(
|
||||
key: &[u8],
|
||||
value: V,
|
||||
wnode: &WriteLockedNodeRef<V>,
|
||||
parent: &mut WriteLockedNodeRef<V>,
|
||||
parent_key_byte: u8,
|
||||
allocator: &impl ArtAllocator<V>,
|
||||
) {
|
||||
let mut bigger_node = wnode.grow(allocator);
|
||||
guard: &'e TreeWriteGuard<K, V, A>,
|
||||
) -> Result<(), ArtError> {
|
||||
let mut bigger_node = wnode.grow(guard.allocator)?;
|
||||
|
||||
if wnode.is_leaf() {
|
||||
bigger_node.insert_value(key[0], value);
|
||||
} else {
|
||||
let value_child = allocate_node_for_value(&key[1..], value, allocator);
|
||||
bigger_node.insert_child(key[0], value_child);
|
||||
// FIXME: deallocate 'bigger_node' on OOM
|
||||
let value_child = allocate_node_for_value(&key[1..], value, guard.allocator)?;
|
||||
bigger_node.insert_new_child(key[0], value_child);
|
||||
}
|
||||
|
||||
// Replace the pointer in the parent
|
||||
parent.replace_child(parent_key_byte, bigger_node.into_ptr());
|
||||
|
||||
// FIXME: if this errors out, deallocate stuff we already allocated
|
||||
guard.remember_obsolete_node(wnode.as_ptr())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Allocate a new leaf node to hold 'value'. If key is long, we may need to allocate
|
||||
// new internal nodes to hold it too
|
||||
fn allocate_node_for_value<V: Value>(
|
||||
fn allocate_node_for_value<'a, V: Value, A: ArtAllocator<V>>(
|
||||
key: &[u8],
|
||||
value: V,
|
||||
allocator: &impl ArtAllocator<V>,
|
||||
) -> NodePtr<V> {
|
||||
allocator: &'a A,
|
||||
) -> Result<NewNodeRef<'a, V, A>, OutOfMemoryError> {
|
||||
let mut prefix_off = key.len().saturating_sub(MAX_PREFIX_LEN + 1);
|
||||
|
||||
let mut leaf_node = node_ref::new_leaf(&key[prefix_off..key.len() - 1], allocator);
|
||||
let mut leaf_node = node_ref::new_leaf(&key[prefix_off..key.len() - 1], allocator)?;
|
||||
leaf_node.insert_value(*key.last().unwrap(), value);
|
||||
|
||||
let mut node = leaf_node;
|
||||
@@ -364,12 +391,12 @@ fn allocate_node_for_value<V: Value>(
|
||||
let mut internal_node = node_ref::new_internal(
|
||||
&remain_prefix[prefix_off..remain_prefix.len() - 1],
|
||||
allocator,
|
||||
);
|
||||
internal_node.insert_child(*remain_prefix.last().unwrap(), node.into_ptr());
|
||||
)?;
|
||||
internal_node.insert_new_child(*remain_prefix.last().unwrap(), node);
|
||||
node = internal_node;
|
||||
}
|
||||
|
||||
node.into_ptr()
|
||||
Ok(node)
|
||||
}
|
||||
|
||||
fn common_prefix(a: &[u8], b: &[u8]) -> usize {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
pub(crate) struct ConcurrentUpdateError();
|
||||
|
||||
pub(crate) struct AtomicLockAndVersion {
|
||||
inner: AtomicU64,
|
||||
}
|
||||
@@ -12,33 +14,30 @@ impl AtomicLockAndVersion {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type ResultOrRestart<T> = Result<T, ()>;
|
||||
|
||||
const fn restart<T>() -> ResultOrRestart<T> {
|
||||
Err(())
|
||||
}
|
||||
|
||||
impl AtomicLockAndVersion {
|
||||
pub(crate) fn read_lock_or_restart(&self) -> ResultOrRestart<u64> {
|
||||
pub(crate) fn read_lock_or_restart(&self) -> Result<u64, ConcurrentUpdateError> {
|
||||
let version = self.await_node_unlocked();
|
||||
if is_obsolete(version) {
|
||||
return restart();
|
||||
return Err(ConcurrentUpdateError());
|
||||
}
|
||||
Ok(version)
|
||||
}
|
||||
|
||||
pub(crate) fn check_or_restart(&self, version: u64) -> ResultOrRestart<()> {
|
||||
pub(crate) fn check_or_restart(&self, version: u64) -> Result<(), ConcurrentUpdateError> {
|
||||
self.read_unlock_or_restart(version)
|
||||
}
|
||||
|
||||
pub(crate) fn read_unlock_or_restart(&self, version: u64) -> ResultOrRestart<()> {
|
||||
pub(crate) fn read_unlock_or_restart(&self, version: u64) -> Result<(), ConcurrentUpdateError> {
|
||||
if self.inner.load(Ordering::Acquire) != version {
|
||||
return restart();
|
||||
return Err(ConcurrentUpdateError());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn upgrade_to_write_lock_or_restart(&self, version: u64) -> ResultOrRestart<()> {
|
||||
pub(crate) fn upgrade_to_write_lock_or_restart(
|
||||
&self,
|
||||
version: u64,
|
||||
) -> Result<(), ConcurrentUpdateError> {
|
||||
if self
|
||||
.inner
|
||||
.compare_exchange(
|
||||
@@ -49,7 +48,7 @@ impl AtomicLockAndVersion {
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
return restart();
|
||||
return Err(ConcurrentUpdateError());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -404,21 +404,18 @@ impl<V: Value> NodePtr<V> {
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME
|
||||
/*
|
||||
pub(crate) fn deallocate(self, allocator: &impl ArtAllocator<V>) {
|
||||
match self.variant() {
|
||||
NodeVariant::Internal4(_) => allocator.dealloc_node_internal4(self.ptr.cast()),
|
||||
NodeVariant::Internal16(_) => allocator.dealloc_node_internal16(self.ptr.cast()),
|
||||
NodeVariant::Internal48(_) => allocator.dealloc_node_internal48(self.ptr.cast()),
|
||||
NodeVariant::Internal256(_) => allocator.dealloc_node_internal256(self.ptr.cast()),
|
||||
NodeVariant::Leaf4(_) => allocator.dealloc_node_leaf4(self.ptr.cast()),
|
||||
NodeVariant::Leaf16(_) => allocator.dealloc_node_leaf16(self.ptr.cast()),
|
||||
NodeVariant::Leaf48(_) => allocator.dealloc_node_leaf48(self.ptr.cast()),
|
||||
NodeVariant::Leaf256(_) => allocator.dealloc_node_leaf256(self.ptr.cast()),
|
||||
}
|
||||
pub(crate) fn deallocate(self, allocator: &impl ArtAllocator<V>) {
|
||||
match self.variant() {
|
||||
NodeVariant::Internal4(_) => allocator.dealloc_node_internal4(self.ptr.cast()),
|
||||
NodeVariant::Internal16(_) => allocator.dealloc_node_internal16(self.ptr.cast()),
|
||||
NodeVariant::Internal48(_) => allocator.dealloc_node_internal48(self.ptr.cast()),
|
||||
NodeVariant::Internal256(_) => allocator.dealloc_node_internal256(self.ptr.cast()),
|
||||
NodeVariant::Leaf4(_) => allocator.dealloc_node_leaf4(self.ptr.cast()),
|
||||
NodeVariant::Leaf16(_) => allocator.dealloc_node_leaf16(self.ptr.cast()),
|
||||
NodeVariant::Leaf48(_) => allocator.dealloc_node_leaf48(self.ptr.cast()),
|
||||
NodeVariant::Leaf256(_) => allocator.dealloc_node_leaf256(self.ptr.cast()),
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
pub fn new_root<V: Value>(allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use super::lock_and_version::ResultOrRestart;
|
||||
use super::node_ptr;
|
||||
use super::node_ptr::ChildOrValuePtr;
|
||||
use super::node_ptr::NodePtr;
|
||||
use crate::EpochPin;
|
||||
use crate::Value;
|
||||
use crate::algorithm::lock_and_version::AtomicLockAndVersion;
|
||||
use crate::algorithm::lock_and_version::ConcurrentUpdateError;
|
||||
use crate::allocator::ArtAllocator;
|
||||
use crate::allocator::OutOfMemoryError;
|
||||
|
||||
pub struct NodeRef<'e, V> {
|
||||
ptr: NodePtr<V>,
|
||||
@@ -30,7 +31,9 @@ impl<'e, V: Value> NodeRef<'e, V> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn read_lock_or_restart(&self) -> ResultOrRestart<ReadLockedNodeRef<'e, V>> {
|
||||
pub(crate) fn read_lock_or_restart(
|
||||
&self,
|
||||
) -> Result<ReadLockedNodeRef<'e, V>, ConcurrentUpdateError> {
|
||||
let version = self.lockword().read_lock_or_restart()?;
|
||||
Ok(ReadLockedNodeRef {
|
||||
ptr: self.ptr,
|
||||
@@ -78,7 +81,7 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> {
|
||||
pub(crate) fn find_child_or_value_or_restart(
|
||||
&self,
|
||||
key_byte: u8,
|
||||
) -> ResultOrRestart<Option<ChildOrValue<'e, V>>> {
|
||||
) -> Result<Option<ChildOrValue<'e, V>>, ConcurrentUpdateError> {
|
||||
let child_or_value = self.ptr.find_child_or_value(key_byte);
|
||||
self.ptr.lockword().check_or_restart(self.version)?;
|
||||
|
||||
@@ -94,7 +97,7 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> {
|
||||
|
||||
pub(crate) fn upgrade_to_write_lock_or_restart(
|
||||
self,
|
||||
) -> ResultOrRestart<WriteLockedNodeRef<'e, V>> {
|
||||
) -> Result<WriteLockedNodeRef<'e, V>, ConcurrentUpdateError> {
|
||||
self.ptr
|
||||
.lockword()
|
||||
.upgrade_to_write_lock_or_restart(self.version)?;
|
||||
@@ -105,7 +108,7 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn read_unlock_or_restart(self) -> ResultOrRestart<()> {
|
||||
pub(crate) fn read_unlock_or_restart(self) -> Result<(), ConcurrentUpdateError> {
|
||||
self.ptr.lockword().check_or_restart(self.version)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -149,9 +152,20 @@ impl<'e, V: Value> WriteLockedNodeRef<'e, V> {
|
||||
self.ptr.insert_value(key_byte, value)
|
||||
}
|
||||
|
||||
pub(crate) fn grow(&self, allocator: &impl ArtAllocator<V>) -> NewNodeRef<V> {
|
||||
pub(crate) fn grow<'a, A>(
|
||||
&self,
|
||||
allocator: &'a A,
|
||||
) -> Result<NewNodeRef<'a, V, A>, OutOfMemoryError>
|
||||
where
|
||||
A: ArtAllocator<V>,
|
||||
{
|
||||
// FIXME: check OOM
|
||||
let new_node = self.ptr.grow(allocator);
|
||||
NewNodeRef { ptr: new_node }
|
||||
Ok(NewNodeRef {
|
||||
ptr: new_node,
|
||||
allocator,
|
||||
extra_nodes: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn as_ptr(&self) -> NodePtr<V> {
|
||||
@@ -171,36 +185,85 @@ impl<'e, V> Drop for WriteLockedNodeRef<'e, V> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct NewNodeRef<V> {
|
||||
pub(crate) struct NewNodeRef<'a, V, A>
|
||||
where
|
||||
V: Value,
|
||||
A: ArtAllocator<V>,
|
||||
{
|
||||
ptr: NodePtr<V>,
|
||||
allocator: &'a A,
|
||||
|
||||
extra_nodes: Vec<NodePtr<V>>,
|
||||
}
|
||||
|
||||
impl<V: Value> NewNodeRef<V> {
|
||||
pub(crate) fn insert_child(&mut self, key_byte: u8, child: NodePtr<V>) {
|
||||
self.ptr.insert_child(key_byte, child)
|
||||
impl<'a, V, A> NewNodeRef<'a, V, A>
|
||||
where
|
||||
V: Value,
|
||||
A: ArtAllocator<V>,
|
||||
{
|
||||
pub(crate) fn insert_old_child(&mut self, key_byte: u8, child: &WriteLockedNodeRef<V>) {
|
||||
self.ptr.insert_child(key_byte, child.as_ptr())
|
||||
}
|
||||
|
||||
pub(crate) fn insert_value(&mut self, key_byte: u8, value: V) {
|
||||
self.ptr.insert_value(key_byte, value)
|
||||
}
|
||||
|
||||
pub(crate) fn into_ptr(self) -> NodePtr<V> {
|
||||
pub(crate) fn into_ptr(mut self) -> NodePtr<V> {
|
||||
let ptr = self.ptr;
|
||||
self.ptr = NodePtr::null();
|
||||
ptr
|
||||
}
|
||||
|
||||
pub(crate) fn insert_new_child(&mut self, key_byte: u8, child: NewNodeRef<'a, V, A>) {
|
||||
let child_ptr = child.into_ptr();
|
||||
self.ptr.insert_child(key_byte, child_ptr);
|
||||
self.extra_nodes.push(child_ptr);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new_internal<V: Value>(
|
||||
impl<'a, V, A> Drop for NewNodeRef<'a, V, A>
|
||||
where
|
||||
V: Value,
|
||||
A: ArtAllocator<V>,
|
||||
{
|
||||
/// This drop implementation deallocates the newly allocated node, if into_ptr() was not called.
|
||||
fn drop(&mut self) {
|
||||
if !self.ptr.is_null() {
|
||||
self.ptr.deallocate(self.allocator);
|
||||
for p in self.extra_nodes.iter() {
|
||||
p.deallocate(self.allocator);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new_internal<'a, V, A>(
|
||||
prefix: &[u8],
|
||||
allocator: &impl ArtAllocator<V>,
|
||||
) -> NewNodeRef<V> {
|
||||
NewNodeRef {
|
||||
allocator: &'a A,
|
||||
) -> Result<NewNodeRef<'a, V, A>, OutOfMemoryError>
|
||||
where
|
||||
V: Value,
|
||||
A: ArtAllocator<V>,
|
||||
{
|
||||
Ok(NewNodeRef {
|
||||
ptr: node_ptr::new_internal(prefix, allocator),
|
||||
}
|
||||
allocator,
|
||||
extra_nodes: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn new_leaf<V: Value>(prefix: &[u8], allocator: &impl ArtAllocator<V>) -> NewNodeRef<V> {
|
||||
NewNodeRef {
|
||||
pub(crate) fn new_leaf<'a, V, A>(
|
||||
prefix: &[u8],
|
||||
allocator: &'a A,
|
||||
) -> Result<NewNodeRef<'a, V, A>, OutOfMemoryError>
|
||||
where
|
||||
V: Value,
|
||||
A: ArtAllocator<V>,
|
||||
{
|
||||
Ok(NewNodeRef {
|
||||
ptr: node_ptr::new_leaf(prefix, allocator),
|
||||
}
|
||||
allocator,
|
||||
extra_nodes: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -15,6 +15,8 @@ pub use crate::algorithm::node_ptr::{
|
||||
NodeLeaf48, NodeLeaf256,
|
||||
};
|
||||
|
||||
pub struct OutOfMemoryError();
|
||||
|
||||
pub trait ArtAllocator<V: crate::Value> {
|
||||
fn alloc_tree(&self) -> *mut Tree<V>;
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ impl EpochShared {
|
||||
(slot, epoch)
|
||||
}
|
||||
|
||||
fn advance(&self) -> u64 {
|
||||
pub(crate) fn advance(&self) -> u64 {
|
||||
// Advance the global epoch
|
||||
let old_epoch = self.global_epoch.fetch_add(2, Ordering::Relaxed);
|
||||
let new_epoch = old_epoch + 2;
|
||||
@@ -68,7 +68,7 @@ impl EpochShared {
|
||||
new_epoch
|
||||
}
|
||||
|
||||
fn broadcast(&self) {
|
||||
pub(crate) fn broadcast(&self) {
|
||||
let Some(_guard) = self.broadcast_lock.try_lock() else {
|
||||
return;
|
||||
};
|
||||
@@ -90,7 +90,7 @@ impl EpochShared {
|
||||
// FIXME: memory fence here, since we used Relaxed?
|
||||
}
|
||||
|
||||
fn get_oldest(&self) -> u64 {
|
||||
pub(crate) fn get_oldest(&self) -> u64 {
|
||||
// Read all slots.
|
||||
let now = self.global_epoch.load(Ordering::Relaxed);
|
||||
let mut oldest = now;
|
||||
@@ -111,7 +111,7 @@ impl EpochShared {
|
||||
|
||||
pub(crate) struct EpochPin<'e> {
|
||||
slot: usize,
|
||||
epoch: u64,
|
||||
pub(crate) epoch: u64,
|
||||
|
||||
handle: &'e LocalHandle<'e>,
|
||||
}
|
||||
|
||||
@@ -126,6 +126,7 @@ pub mod allocator;
|
||||
mod epoch;
|
||||
|
||||
use algorithm::RootPtr;
|
||||
use algorithm::node_ptr::NodePtr;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
@@ -154,17 +155,65 @@ pub trait Key: Clone + Debug {
|
||||
/// the old sticks around until all readers that might see the old value are gone.
|
||||
pub trait Value: Clone {}
|
||||
|
||||
const MAX_GARBAGE: usize = 1024;
|
||||
|
||||
pub struct Tree<V: Value> {
|
||||
root: RootPtr<V>,
|
||||
|
||||
writer_attached: AtomicBool,
|
||||
|
||||
epoch: epoch::EpochShared,
|
||||
|
||||
garbage: spin::Mutex<GarbageQueue<V>>,
|
||||
}
|
||||
|
||||
unsafe impl<V: Value + Sync> Sync for Tree<V> {}
|
||||
unsafe impl<V: Value + Send> Send for Tree<V> {}
|
||||
|
||||
struct GarbageQueueFullError();
|
||||
|
||||
struct GarbageQueue<V> {
|
||||
slots: [(NodePtr<V>, u64); MAX_GARBAGE],
|
||||
front: usize,
|
||||
back: usize,
|
||||
}
|
||||
impl<V> GarbageQueue<V> {
|
||||
fn new() -> GarbageQueue<V> {
|
||||
GarbageQueue {
|
||||
slots: [const { (NodePtr::null(), 0) }; MAX_GARBAGE],
|
||||
front: 0,
|
||||
back: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn remember_obsolete_node(
|
||||
&mut self,
|
||||
ptr: NodePtr<V>,
|
||||
epoch: u64,
|
||||
) -> Result<(), GarbageQueueFullError> {
|
||||
if self.front == self.back.wrapping_add(MAX_GARBAGE) {
|
||||
return Err(GarbageQueueFullError());
|
||||
}
|
||||
|
||||
self.slots[self.front % MAX_GARBAGE] = (ptr, epoch);
|
||||
self.front = self.front.wrapping_add(1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn next_obsolete(&mut self, cutoff_epoch: u64) -> Option<NodePtr<V>> {
|
||||
if self.front == self.back {
|
||||
return None;
|
||||
}
|
||||
let slot = &self.slots[self.back % MAX_GARBAGE];
|
||||
// FIXME: performing wrapping comparison
|
||||
if slot.1 < cutoff_epoch {
|
||||
self.back += 1;
|
||||
return Some(slot.0);
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Struct created at postmaster startup
|
||||
pub struct TreeInitStruct<'t, K: Key, V: Value, A: ArtAllocator<V>> {
|
||||
tree: &'t Tree<V>,
|
||||
@@ -211,6 +260,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator<V>> TreeInitStruct<'t, K, V,
|
||||
root: algorithm::new_root(allocator),
|
||||
writer_attached: AtomicBool::new(false),
|
||||
epoch: epoch::EpochShared::new(),
|
||||
garbage: spin::Mutex::new(GarbageQueue::new()),
|
||||
};
|
||||
unsafe { tree_ptr.write(init) };
|
||||
|
||||
@@ -261,6 +311,18 @@ impl<'t, K: Key + Clone, V: Value, A: ArtAllocator<V>> TreeWriteAccess<'t, K, V,
|
||||
phantom_key: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn collect_garbage(&'t self) {
|
||||
self.tree.epoch.advance();
|
||||
self.tree.epoch.broadcast();
|
||||
|
||||
let cutoff_epoch = self.tree.epoch.get_oldest();
|
||||
|
||||
let mut garbage_queue = self.tree.garbage.lock();
|
||||
while let Some(ptr) = garbage_queue.next_obsolete(cutoff_epoch) {
|
||||
ptr.deallocate(self.allocator);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'t, K: Key + Clone, V: Value> TreeReadAccess<'t, K, V> {
|
||||
@@ -311,18 +373,19 @@ impl<'t, K: Key, V: Value, A: ArtAllocator<V>> TreeWriteGuard<'t, K, V, A> {
|
||||
where
|
||||
F: FnOnce(Option<&V>) -> Option<V>,
|
||||
{
|
||||
algorithm::update_fn(
|
||||
key,
|
||||
value_fn,
|
||||
self.tree.root,
|
||||
self.allocator,
|
||||
&self.epoch_pin,
|
||||
)
|
||||
algorithm::update_fn(key, value_fn, self.tree.root, self)
|
||||
}
|
||||
|
||||
pub fn get(&mut self, key: &K) -> Option<V> {
|
||||
algorithm::search(key, self.tree.root, &self.epoch_pin)
|
||||
}
|
||||
|
||||
fn remember_obsolete_node(&'t self, ptr: NodePtr<V>) -> Result<(), GarbageQueueFullError> {
|
||||
self.tree
|
||||
.garbage
|
||||
.lock()
|
||||
.remember_obsolete_node(ptr, self.epoch_pin.epoch)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'t, K: Key, V: Value + Debug> TreeReadGuard<'t, K, V> {
|
||||
|
||||
Reference in New Issue
Block a user