fix concurrency issues with the LFC

- Add another locking hash table to track which cached pages are currently being
  modified, by smgrwrite() or smgrread() or by prefetch.

- Use single-value Leaf pages in the art tree. That seems simpler after all,
  and it eliminates some corner cases where a Value needed to be cloned, which
  made it tricky to use atomics or other interior mutability on the Values
This commit is contained in:
Heikki Linnakangas
2025-05-10 02:36:48 +03:00
parent 0c25ea9e31
commit e6a4171fa1
14 changed files with 728 additions and 861 deletions

1
Cargo.lock generated
View File

@@ -1349,6 +1349,7 @@ dependencies = [
"axum 0.8.1",
"bytes",
"cbindgen",
"clashmap",
"http 1.1.0",
"libc",
"metrics",

View File

@@ -6,12 +6,12 @@ use std::vec::Vec;
use crate::algorithm::lock_and_version::ConcurrentUpdateError;
use crate::algorithm::node_ptr::MAX_PREFIX_LEN;
use crate::algorithm::node_ref::ChildOrValue;
use crate::algorithm::node_ref::{NewNodeRef, NodeRef, ReadLockedNodeRef, WriteLockedNodeRef};
use crate::allocator::OutOfMemoryError;
use crate::GarbageQueueFullError;
use crate::TreeWriteGuard;
use crate::UpdateAction;
use crate::allocator::ArtAllocator;
use crate::epoch::EpochPin;
use crate::{Key, Value};
@@ -89,7 +89,7 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>, F>(
root: RootPtr<V>,
guard: &'g mut TreeWriteGuard<'e, K, V, A>,
) where
F: FnOnce(Option<&V>) -> Option<V>,
F: FnOnce(Option<&V>) -> UpdateAction<V>,
{
let value_fn_cell = std::cell::Cell::new(Some(value_fn));
loop {
@@ -108,7 +108,6 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>, F>(
) {
Ok(()) => break,
Err(ArtError::ConcurrentUpdate) => {
eprintln!("retrying");
continue; // retry
}
Err(ArtError::OutOfMemory) => {
@@ -150,21 +149,25 @@ fn lookup_recurse<'e, V: Value>(
rnode.read_unlock_or_restart()?;
return Ok(None);
};
if rnode.is_leaf() {
assert_eq!(key.len(), prefix_len);
let vptr = rnode.get_leaf_value_ptr()?;
// safety: It's OK to return a ref of the pointer because we checked the version
// and the lifetime of 'epoch_pin' enforces that the reference is only accessible
// as long as the epoch is pinned.
let v = unsafe { vptr.as_ref().unwrap() };
return Ok(Some(v));
}
let key = &key[prefix_len..];
// find child (or leaf value)
let next_node = rnode.find_child_or_value_or_restart(key[0])?;
let next_node = rnode.find_child_or_restart(key[0])?;
match next_node {
None => Ok(None), // key not found
Some(ChildOrValue::Value(vptr)) => {
// safety: It's OK to return a ref of the pointer because we checked the version
// and the lifetime of 'epoch_pin' enforces that the reference is only accessible
// as long as the epoch is pinned.
let v = unsafe { vptr.as_ref().unwrap() };
Ok(Some(v))
}
Some(ChildOrValue::Child(v)) => lookup_recurse(&key[1..], v, Some(rnode), epoch_pin),
Some(child) => lookup_recurse(&key[1..], child, Some(rnode), epoch_pin),
}
}
@@ -179,23 +182,36 @@ fn next_recurse<'e, V: Value>(
if prefix.len() != 0 {
path.extend_from_slice(prefix);
}
assert!(path.len() < min_key.len());
use std::cmp::Ordering;
let mut min_key_byte = match path.as_slice().cmp(&min_key[0..path.len()]) {
Ordering::Less => {
rnode.read_unlock_or_restart()?;
return Ok(None);
}
let comparison = path.as_slice().cmp(&min_key[0..path.len()]);
if comparison == Ordering::Less {
rnode.read_unlock_or_restart()?;
return Ok(None);
}
if rnode.is_leaf() {
assert_eq!(path.len(), min_key.len());
let vptr = rnode.get_leaf_value_ptr()?;
// safety: It's OK to return a ref of the pointer because we checked the version
// and the lifetime of 'epoch_pin' enforces that the reference is only accessible
// as long as the epoch is pinned.
let v = unsafe { vptr.as_ref().unwrap() };
return Ok(Some(v));
}
let mut min_key_byte = match comparison {
Ordering::Less => unreachable!(), // checked this above already
Ordering::Equal => min_key[path.len()],
Ordering::Greater => 0,
};
loop {
match rnode.find_next_child_or_value_or_restart(min_key_byte)? {
match rnode.find_next_child_or_restart(min_key_byte)? {
None => {
return Ok(None);
}
Some((key_byte, ChildOrValue::Child(child_ref))) => {
Some((key_byte, child_ref)) => {
let path_len = path.len();
path.push(key_byte);
let result = next_recurse(min_key, path, child_ref, epoch_pin)?;
@@ -208,15 +224,6 @@ fn next_recurse<'e, V: Value>(
path.truncate(path_len);
min_key_byte = key_byte + 1;
}
Some((key_byte, ChildOrValue::Value(vptr))) => {
path.push(key_byte);
assert_eq!(path.len(), min_key.len());
// safety: It's OK to return a ref of the pointer because we checked the version
// and the lifetime of 'epoch_pin' enforces that the reference is only accessible
// as long as the epoch is pinned.
let v = unsafe { vptr.as_ref().unwrap() };
return Ok(Some(v));
}
}
}
}
@@ -232,7 +239,7 @@ pub(crate) fn update_recurse<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>, F>(
orig_key: &[u8],
) -> Result<(), ArtError>
where
F: FnOnce(Option<&V>) -> Option<V>,
F: FnOnce(Option<&V>) -> UpdateAction<V>,
{
let rnode = node.read_lock_or_restart()?;
@@ -242,8 +249,14 @@ where
let mut wparent = rparent.upgrade_to_write_lock_or_restart()?;
let mut wnode = rnode.upgrade_to_write_lock_or_restart()?;
if let Some(new_value) = value_fn(None) {
insert_split_prefix(key, new_value, &mut wnode, &mut wparent, parent_key, guard)?;
match value_fn(None) {
UpdateAction::Nothing => {}
UpdateAction::Insert(new_value) => {
insert_split_prefix(key, new_value, &mut wnode, &mut wparent, parent_key, guard)?;
}
UpdateAction::Remove => {
panic!("unexpected Remove action on insertion");
}
}
wnode.write_unlock();
wparent.write_unlock();
@@ -253,7 +266,34 @@ where
let key = &key[prefix_match_len as usize..];
let level = level + prefix_match_len as usize;
let next_node = rnode.find_child_or_value_or_restart(key[0])?;
if rnode.is_leaf() {
assert_eq!(key.len(), 0);
let (rparent, parent_key) = rparent.expect("root cannot be leaf");
let mut wparent = rparent.upgrade_to_write_lock_or_restart()?;
let mut wnode = rnode.upgrade_to_write_lock_or_restart()?;
// safety: Now that we have acquired the write lock, we have exclusive access to the
// value. XXX: There might be concurrent reads though?
let value_mut = wnode.get_leaf_value_mut();
match value_fn(Some(value_mut)) {
UpdateAction::Nothing => {}
UpdateAction::Insert(_) => panic!("cannot insert over existing value"),
UpdateAction::Remove => {
// TODO: Shrink the node
// TODO: If the parent becomes empty, unlink it from grandparent
// TODO: If parent has only one child left, merge it with the child, extending its
// prefix
wparent.delete_child(parent_key);
}
}
wnode.write_unlock();
wparent.write_unlock();
return Ok(());
}
let next_node = rnode.find_child_or_restart(key[0])?;
if next_node.is_none() {
if rnode.is_full() {
@@ -261,63 +301,53 @@ where
let mut wparent = rparent.upgrade_to_write_lock_or_restart()?;
let wnode = rnode.upgrade_to_write_lock_or_restart()?;
if let Some(new_value) = value_fn(None) {
insert_and_grow(key, new_value, &wnode, &mut wparent, parent_key, guard)?;
wnode.write_unlock_obsolete();
wparent.write_unlock();
} else {
wnode.write_unlock();
wparent.write_unlock();
}
match value_fn(None) {
UpdateAction::Nothing => {
wnode.write_unlock();
wparent.write_unlock();
}
UpdateAction::Insert(new_value) => {
insert_and_grow(key, new_value, &wnode, &mut wparent, parent_key, guard)?;
wnode.write_unlock_obsolete();
wparent.write_unlock();
}
UpdateAction::Remove => {
panic!("unexpected Remove action on insertion");
}
};
} else {
let mut wnode = rnode.upgrade_to_write_lock_or_restart()?;
if let Some((rparent, _)) = rparent {
rparent.read_unlock_or_restart()?;
}
if let Some(new_value) = value_fn(None) {
insert_to_node(&mut wnode, key, new_value, guard)?;
}
match value_fn(None) {
UpdateAction::Nothing => {}
UpdateAction::Insert(new_value) => {
insert_to_node(&mut wnode, key, new_value, guard)?;
}
UpdateAction::Remove => {
panic!("unexpected Remove action on insertion");
}
};
wnode.write_unlock();
}
return Ok(());
} else {
let next_node = next_node.unwrap(); // checked above it's not None
let next_child = next_node.unwrap(); // checked above it's not None
if let Some((rparent, _)) = rparent {
rparent.read_unlock_or_restart()?;
}
match next_node {
ChildOrValue::Value(existing_value_ptr) => {
assert!(key.len() == 1);
let mut wnode = rnode.upgrade_to_write_lock_or_restart()?;
// safety: Now that we have acquired the write lock, we have exclusive access to the
// value
let vmut = unsafe { existing_value_ptr.cast_mut().as_mut() }.unwrap();
if let Some(new_value) = value_fn(Some(vmut)) {
*vmut = new_value;
} else {
// TODO: Shrink the node
// TODO: If the node becomes empty, unlink it from parent
wnode.delete_value(key[0]);
}
wnode.write_unlock();
Ok(())
}
ChildOrValue::Child(next_child) => {
// recurse to next level
update_recurse(
&key[1..],
value_fn,
next_child,
Some((rnode, key[0])),
guard,
level + 1,
orig_key,
)
}
}
// recurse to next level
update_recurse(
&key[1..],
value_fn,
next_child,
Some((rnode, key[0])),
guard,
level + 1,
orig_key,
)
}
}
@@ -351,10 +381,19 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>(
path.push(PathElement::Prefix(Vec::from(prefix)));
}
if rnode.is_leaf() {
let vptr = rnode.get_leaf_value_ptr()?;
// safety: It's OK to return a ref of the pointer because we checked the version
// and the lifetime of 'epoch_pin' enforces that the reference is only accessible
// as long as the epoch is pinned.
let val = unsafe { vptr.as_ref().unwrap() };
eprintln!("{} {:?}: {:?}", indent, path, val);
}
for key_byte in 0..u8::MAX {
match rnode.find_child_or_value_or_restart(key_byte)? {
match rnode.find_child_or_restart(key_byte)? {
None => continue,
Some(ChildOrValue::Child(child_ref)) => {
Some(child_ref) => {
let rchild = child_ref.read_lock_or_restart()?;
eprintln!(
"{} {:?}, {}: prefix {:?}",
@@ -369,11 +408,6 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>(
dump_recurse(&child_path, child_ref, epoch_pin, level + 1)?;
}
Some(ChildOrValue::Value(val)) => {
eprintln!("{} {:?}, {}: {:?}", indent, path, key_byte, unsafe {
val.as_ref().unwrap()
});
}
}
}
@@ -429,12 +463,8 @@ fn insert_to_node<'e, K: Key, V: Value, A: ArtAllocator<V>>(
value: V,
guard: &'e TreeWriteGuard<K, V, A>,
) -> Result<(), OutOfMemoryError> {
if wnode.is_leaf() {
wnode.insert_value(key[0], value);
} else {
let value_child = allocate_node_for_value(&key[1..], value, guard.tree_writer.allocator)?;
wnode.insert_child(key[0], value_child.into_ptr());
}
let value_child = allocate_node_for_value(&key[1..], value, guard.tree_writer.allocator)?;
wnode.insert_child(key[0], value_child.into_ptr());
Ok(())
}
@@ -448,13 +478,10 @@ fn insert_and_grow<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>>(
guard: &'g mut TreeWriteGuard<'e, K, V, A>,
) -> Result<(), ArtError> {
let mut bigger_node = wnode.grow(guard.tree_writer.allocator)?;
if wnode.is_leaf() {
bigger_node.insert_value(key[0], value);
} else {
// FIXME: deallocate 'bigger_node' on OOM
let value_child = allocate_node_for_value(&key[1..], value, guard.tree_writer.allocator)?;
bigger_node.insert_new_child(key[0], value_child);
}
// FIXME: deallocate 'bigger_node' on OOM
let value_child = allocate_node_for_value(&key[1..], value, guard.tree_writer.allocator)?;
bigger_node.insert_new_child(key[0], value_child);
// Replace the pointer in the parent
parent.replace_child(parent_key_byte, bigger_node.into_ptr());
@@ -464,17 +491,16 @@ fn insert_and_grow<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>>(
Ok(())
}
// Allocate a new leaf node to hold 'value'. If key is long, we may need to allocate
// new internal nodes to hold it too
// Allocate a new leaf node to hold 'value'. If the key is long, we
// may need to allocate new internal nodes to hold it too
fn allocate_node_for_value<'a, V: Value, A: ArtAllocator<V>>(
key: &[u8],
value: V,
allocator: &'a A,
) -> Result<NewNodeRef<'a, V, A>, OutOfMemoryError> {
let mut prefix_off = key.len().saturating_sub(MAX_PREFIX_LEN + 1);
let mut prefix_off = key.len().saturating_sub(MAX_PREFIX_LEN);
let mut leaf_node = node_ref::new_leaf(&key[prefix_off..key.len() - 1], allocator)?;
leaf_node.insert_value(*key.last().unwrap(), value);
let leaf_node = node_ref::new_leaf(&key[prefix_off..key.len()], value, allocator)?;
let mut node = leaf_node;
while prefix_off > 0 {

View File

@@ -13,10 +13,7 @@ enum NodeTag {
Internal16,
Internal48,
Internal256,
Leaf4,
Leaf16,
Leaf48,
Leaf256,
Leaf,
}
#[repr(C)]
@@ -31,6 +28,12 @@ pub(crate) struct NodePtr<V> {
phantom_value: PhantomData<V>,
}
impl<V> PartialEq for NodePtr<V> {
fn eq(&self, other: &NodePtr<V>) -> bool {
self.ptr == other.ptr
}
}
impl<V> std::fmt::Debug for NodePtr<V> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(fmt, "0x{}", self.ptr.addr())
@@ -52,10 +55,7 @@ enum NodeVariant<'a, V> {
Internal16(&'a NodeInternal16<V>),
Internal48(&'a NodeInternal48<V>),
Internal256(&'a NodeInternal256<V>),
Leaf4(&'a NodeLeaf4<V>),
Leaf16(&'a NodeLeaf16<V>),
Leaf48(&'a NodeLeaf48<V>),
Leaf256(&'a NodeLeaf256<V>),
Leaf(&'a NodeLeaf<V>),
}
enum NodeVariantMut<'a, V> {
@@ -63,15 +63,7 @@ enum NodeVariantMut<'a, V> {
Internal16(&'a mut NodeInternal16<V>),
Internal48(&'a mut NodeInternal48<V>),
Internal256(&'a mut NodeInternal256<V>),
Leaf4(&'a mut NodeLeaf4<V>),
Leaf16(&'a mut NodeLeaf16<V>),
Leaf48(&'a mut NodeLeaf48<V>),
Leaf256(&'a mut NodeLeaf256<V>),
}
pub(crate) enum ChildOrValuePtr<V> {
Child(NodePtr<V>),
Value(*const V),
Leaf(&'a mut NodeLeaf<V>),
}
#[repr(C)]
@@ -127,54 +119,14 @@ pub struct NodeInternal256<V> {
}
#[repr(C)]
pub struct NodeLeaf4<V> {
pub struct NodeLeaf<V> {
tag: NodeTag,
lock_and_version: AtomicLockAndVersion,
prefix: [u8; MAX_PREFIX_LEN],
prefix_len: u8,
num_values: u8,
child_keys: [u8; 4],
child_values: [Option<V>; 4],
}
#[repr(C)]
pub struct NodeLeaf16<V> {
tag: NodeTag,
lock_and_version: AtomicLockAndVersion,
prefix: [u8; MAX_PREFIX_LEN],
prefix_len: u8,
num_values: u8,
child_keys: [u8; 16],
child_values: [Option<V>; 16],
}
#[repr(C)]
pub struct NodeLeaf48<V> {
tag: NodeTag,
lock_and_version: AtomicLockAndVersion,
prefix: [u8; MAX_PREFIX_LEN],
prefix_len: u8,
num_values: u8,
child_indexes: [u8; 256],
child_values: [Option<V>; 48],
}
#[repr(C)]
pub struct NodeLeaf256<V> {
tag: NodeTag,
lock_and_version: AtomicLockAndVersion,
prefix: [u8; MAX_PREFIX_LEN],
prefix_len: u8,
num_values: u16,
child_values: [Option<V>; 256],
value: V,
}
impl<V> NodePtr<V> {
@@ -184,10 +136,7 @@ impl<V> NodePtr<V> {
NodeVariant::Internal16(_) => false,
NodeVariant::Internal48(_) => false,
NodeVariant::Internal256(_) => false,
NodeVariant::Leaf4(_) => true,
NodeVariant::Leaf16(_) => true,
NodeVariant::Leaf48(_) => true,
NodeVariant::Leaf256(_) => true,
NodeVariant::Leaf(_) => true,
}
}
@@ -197,10 +146,7 @@ impl<V> NodePtr<V> {
NodeVariant::Internal16(n) => &n.lock_and_version,
NodeVariant::Internal48(n) => &n.lock_and_version,
NodeVariant::Internal256(n) => &n.lock_and_version,
NodeVariant::Leaf4(n) => &n.lock_and_version,
NodeVariant::Leaf16(n) => &n.lock_and_version,
NodeVariant::Leaf48(n) => &n.lock_and_version,
NodeVariant::Leaf256(n) => &n.lock_and_version,
NodeVariant::Leaf(n) => &n.lock_and_version,
}
}
@@ -230,17 +176,8 @@ impl<V> NodePtr<V> {
NodeTag::Internal256 => NodeVariant::Internal256(
NonNull::new_unchecked(self.ptr.cast::<NodeInternal256<V>>()).as_ref(),
),
NodeTag::Leaf4 => NodeVariant::Leaf4(
NonNull::new_unchecked(self.ptr.cast::<NodeLeaf4<V>>()).as_ref(),
),
NodeTag::Leaf16 => NodeVariant::Leaf16(
NonNull::new_unchecked(self.ptr.cast::<NodeLeaf16<V>>()).as_ref(),
),
NodeTag::Leaf48 => NodeVariant::Leaf48(
NonNull::new_unchecked(self.ptr.cast::<NodeLeaf48<V>>()).as_ref(),
),
NodeTag::Leaf256 => NodeVariant::Leaf256(
NonNull::new_unchecked(self.ptr.cast::<NodeLeaf256<V>>()).as_ref(),
NodeTag::Leaf => NodeVariant::Leaf(
NonNull::new_unchecked(self.ptr.cast::<NodeLeaf<V>>()).as_ref(),
),
}
}
@@ -261,17 +198,8 @@ impl<V> NodePtr<V> {
NodeTag::Internal256 => NodeVariantMut::Internal256(
NonNull::new_unchecked(self.ptr.cast::<NodeInternal256<V>>()).as_mut(),
),
NodeTag::Leaf4 => NodeVariantMut::Leaf4(
NonNull::new_unchecked(self.ptr.cast::<NodeLeaf4<V>>()).as_mut(),
),
NodeTag::Leaf16 => NodeVariantMut::Leaf16(
NonNull::new_unchecked(self.ptr.cast::<NodeLeaf16<V>>()).as_mut(),
),
NodeTag::Leaf48 => NodeVariantMut::Leaf48(
NonNull::new_unchecked(self.ptr.cast::<NodeLeaf48<V>>()).as_mut(),
),
NodeTag::Leaf256 => NodeVariantMut::Leaf256(
NonNull::new_unchecked(self.ptr.cast::<NodeLeaf256<V>>()).as_mut(),
NodeTag::Leaf => NodeVariantMut::Leaf(
NonNull::new_unchecked(self.ptr.cast::<NodeLeaf<V>>()).as_mut(),
),
}
}
@@ -295,10 +223,7 @@ impl<V: Value> NodePtr<V> {
NodeVariant::Internal16(n) => n.get_prefix(),
NodeVariant::Internal48(n) => n.get_prefix(),
NodeVariant::Internal256(n) => n.get_prefix(),
NodeVariant::Leaf4(n) => n.get_prefix(),
NodeVariant::Leaf16(n) => n.get_prefix(),
NodeVariant::Leaf48(n) => n.get_prefix(),
NodeVariant::Leaf256(n) => n.get_prefix(),
NodeVariant::Leaf(n) => n.get_prefix(),
}
}
@@ -308,65 +233,27 @@ impl<V: Value> NodePtr<V> {
NodeVariant::Internal16(n) => n.is_full(),
NodeVariant::Internal48(n) => n.is_full(),
NodeVariant::Internal256(n) => n.is_full(),
NodeVariant::Leaf4(n) => n.is_full(),
NodeVariant::Leaf16(n) => n.is_full(),
NodeVariant::Leaf48(n) => n.is_full(),
NodeVariant::Leaf256(n) => n.is_full(),
NodeVariant::Leaf(_) => panic!("is_full() called on leaf node"),
}
}
pub(crate) fn find_child_or_value(&self, key_byte: u8) -> Option<ChildOrValuePtr<V>> {
pub(crate) fn find_child(&self, key_byte: u8) -> Option<NodePtr<V>> {
match self.variant() {
NodeVariant::Internal4(n) => n.find_child(key_byte).map(|c| ChildOrValuePtr::Child(c)),
NodeVariant::Internal16(n) => n.find_child(key_byte).map(|c| ChildOrValuePtr::Child(c)),
NodeVariant::Internal48(n) => n.find_child(key_byte).map(|c| ChildOrValuePtr::Child(c)),
NodeVariant::Internal256(n) => {
n.find_child(key_byte).map(|c| ChildOrValuePtr::Child(c))
}
NodeVariant::Leaf4(n) => n
.get_leaf_value(key_byte)
.map(|v| ChildOrValuePtr::Value(v)),
NodeVariant::Leaf16(n) => n
.get_leaf_value(key_byte)
.map(|v| ChildOrValuePtr::Value(v)),
NodeVariant::Leaf48(n) => n
.get_leaf_value(key_byte)
.map(|v| ChildOrValuePtr::Value(v)),
NodeVariant::Leaf256(n) => n
.get_leaf_value(key_byte)
.map(|v| ChildOrValuePtr::Value(v)),
NodeVariant::Internal4(n) => n.find_child(key_byte),
NodeVariant::Internal16(n) => n.find_child(key_byte),
NodeVariant::Internal48(n) => n.find_child(key_byte),
NodeVariant::Internal256(n) => n.find_child(key_byte),
NodeVariant::Leaf(_) => panic!("find_child called on leaf node"),
}
}
pub(crate) fn find_next_child_or_value(
&self,
key_byte: u8,
) -> Option<(u8, ChildOrValuePtr<V>)> {
pub(crate) fn find_next_child(&self, key_byte: u8) -> Option<(u8, NodePtr<V>)> {
match self.variant() {
NodeVariant::Internal4(n) => n
.find_next_child(key_byte)
.map(|(k, c)| (k, ChildOrValuePtr::Child(c))),
NodeVariant::Internal16(n) => n
.find_next_child(key_byte)
.map(|(k, c)| (k, ChildOrValuePtr::Child(c))),
NodeVariant::Internal48(n) => n
.find_next_child(key_byte)
.map(|(k, c)| (k, ChildOrValuePtr::Child(c))),
NodeVariant::Internal256(n) => n
.find_next_child(key_byte)
.map(|(k, c)| (k, ChildOrValuePtr::Child(c))),
NodeVariant::Leaf4(n) => n
.find_next_leaf_value(key_byte)
.map(|(k, v)| (k, ChildOrValuePtr::Value(v))),
NodeVariant::Leaf16(n) => n
.find_next_leaf_value(key_byte)
.map(|(k, v)| (k, ChildOrValuePtr::Value(v))),
NodeVariant::Leaf48(n) => n
.find_next_leaf_value(key_byte)
.map(|(k, v)| (k, ChildOrValuePtr::Value(v))),
NodeVariant::Leaf256(n) => n
.find_next_leaf_value(key_byte)
.map(|(k, v)| (k, ChildOrValuePtr::Value(v))),
NodeVariant::Internal4(n) => n.find_next_child(key_byte),
NodeVariant::Internal16(n) => n.find_next_child(key_byte),
NodeVariant::Internal48(n) => n.find_next_child(key_byte),
NodeVariant::Internal256(n) => n.find_next_child(key_byte),
NodeVariant::Leaf(_) => panic!("find_next_child called on leaf node"),
}
}
@@ -376,10 +263,7 @@ impl<V: Value> NodePtr<V> {
NodeVariantMut::Internal16(n) => n.truncate_prefix(new_prefix_len),
NodeVariantMut::Internal48(n) => n.truncate_prefix(new_prefix_len),
NodeVariantMut::Internal256(n) => n.truncate_prefix(new_prefix_len),
NodeVariantMut::Leaf4(n) => n.truncate_prefix(new_prefix_len),
NodeVariantMut::Leaf16(n) => n.truncate_prefix(new_prefix_len),
NodeVariantMut::Leaf48(n) => n.truncate_prefix(new_prefix_len),
NodeVariantMut::Leaf256(n) => n.truncate_prefix(new_prefix_len),
NodeVariantMut::Leaf(n) => n.truncate_prefix(new_prefix_len),
}
}
@@ -389,10 +273,7 @@ impl<V: Value> NodePtr<V> {
NodeVariant::Internal16(n) => n.grow(allocator),
NodeVariant::Internal48(n) => n.grow(allocator),
NodeVariant::Internal256(_) => panic!("cannot grow Internal256 node"),
NodeVariant::Leaf4(n) => n.grow(allocator),
NodeVariant::Leaf16(n) => n.grow(allocator),
NodeVariant::Leaf48(n) => n.grow(allocator),
NodeVariant::Leaf256(_) => panic!("cannot grow Leaf256 node"),
NodeVariant::Leaf(_) => panic!("cannot grow Leaf node"),
}
}
@@ -402,10 +283,7 @@ impl<V: Value> NodePtr<V> {
NodeVariantMut::Internal16(n) => n.insert_child(key_byte, child),
NodeVariantMut::Internal48(n) => n.insert_child(key_byte, child),
NodeVariantMut::Internal256(n) => n.insert_child(key_byte, child),
NodeVariantMut::Leaf4(_)
| NodeVariantMut::Leaf16(_)
| NodeVariantMut::Leaf48(_)
| NodeVariantMut::Leaf256(_) => panic!("insert_child called on leaf node"),
NodeVariantMut::Leaf(_) => panic!("insert_child called on leaf node"),
}
}
@@ -415,36 +293,37 @@ impl<V: Value> NodePtr<V> {
NodeVariantMut::Internal16(n) => n.replace_child(key_byte, replacement),
NodeVariantMut::Internal48(n) => n.replace_child(key_byte, replacement),
NodeVariantMut::Internal256(n) => n.replace_child(key_byte, replacement),
NodeVariantMut::Leaf4(_)
| NodeVariantMut::Leaf16(_)
| NodeVariantMut::Leaf48(_)
| NodeVariantMut::Leaf256(_) => panic!("replace_child called on leaf node"),
NodeVariantMut::Leaf(_) => panic!("replace_child called on leaf node"),
}
}
pub(crate) fn insert_value(&mut self, key_byte: u8, value: V) {
pub(crate) fn delete_child(&mut self, key_byte: u8) {
match self.variant_mut() {
NodeVariantMut::Internal4(n) => n.delete_child(key_byte),
NodeVariantMut::Internal16(n) => n.delete_child(key_byte),
NodeVariantMut::Internal48(n) => n.delete_child(key_byte),
NodeVariantMut::Internal256(n) => n.delete_child(key_byte),
NodeVariantMut::Leaf(_) => panic!("delete_child called on leaf node"),
}
}
pub(crate) fn get_leaf_value(&self) -> &V {
match self.variant() {
NodeVariant::Internal4(_)
| NodeVariant::Internal16(_)
| NodeVariant::Internal48(_)
| NodeVariant::Internal256(_) => panic!("get_leaf_value called on internal node"),
NodeVariant::Leaf(n) => n.get_leaf_value(),
}
}
pub(crate) fn get_leaf_value_mut(&mut self) -> &mut V {
match self.variant_mut() {
NodeVariantMut::Internal4(_)
| NodeVariantMut::Internal16(_)
| NodeVariantMut::Internal48(_)
| NodeVariantMut::Internal256(_) => panic!("insert_value called on internal node"),
NodeVariantMut::Leaf4(n) => n.insert_value(key_byte, value),
NodeVariantMut::Leaf16(n) => n.insert_value(key_byte, value),
NodeVariantMut::Leaf48(n) => n.insert_value(key_byte, value),
NodeVariantMut::Leaf256(n) => n.insert_value(key_byte, value),
}
}
pub(crate) fn delete_value(&mut self, key_byte: u8) {
match self.variant_mut() {
NodeVariantMut::Internal4(_)
| NodeVariantMut::Internal16(_)
| NodeVariantMut::Internal48(_)
| NodeVariantMut::Internal256(_) => panic!("delete_value called on internal node"),
NodeVariantMut::Leaf4(n) => n.delete_value(key_byte),
NodeVariantMut::Leaf16(n) => n.delete_value(key_byte),
NodeVariantMut::Leaf48(n) => n.delete_value(key_byte),
NodeVariantMut::Leaf256(n) => n.delete_value(key_byte),
| NodeVariantMut::Internal256(_) => panic!("get_leaf_value called on internal node"),
NodeVariantMut::Leaf(n) => n.get_leaf_value_mut(),
}
}
@@ -454,10 +333,7 @@ impl<V: Value> NodePtr<V> {
NodeVariant::Internal16(_) => allocator.dealloc_node_internal16(self.ptr.cast()),
NodeVariant::Internal48(_) => allocator.dealloc_node_internal48(self.ptr.cast()),
NodeVariant::Internal256(_) => allocator.dealloc_node_internal256(self.ptr.cast()),
NodeVariant::Leaf4(_) => allocator.dealloc_node_leaf4(self.ptr.cast()),
NodeVariant::Leaf16(_) => allocator.dealloc_node_leaf16(self.ptr.cast()),
NodeVariant::Leaf48(_) => allocator.dealloc_node_leaf48(self.ptr.cast()),
NodeVariant::Leaf256(_) => allocator.dealloc_node_leaf256(self.ptr.cast()),
NodeVariant::Leaf(_) => allocator.dealloc_node_leaf(self.ptr.cast()),
}
}
}
@@ -497,21 +373,19 @@ pub fn new_internal<V: Value>(prefix: &[u8], allocator: &impl ArtAllocator<V>) -
ptr.into()
}
pub fn new_leaf<V: Value>(prefix: &[u8], allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
let ptr: *mut NodeLeaf4<V> = allocator.alloc_node_leaf4().cast();
pub fn new_leaf<V: Value>(prefix: &[u8], value: V, allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
let ptr: *mut NodeLeaf<V> = allocator.alloc_node_leaf().cast();
if ptr.is_null() {
panic!("out of memory");
}
let mut init = NodeLeaf4 {
tag: NodeTag::Leaf4,
let mut init = NodeLeaf {
tag: NodeTag::Leaf,
lock_and_version: AtomicLockAndVersion::new(),
prefix: [8; MAX_PREFIX_LEN],
prefix_len: prefix.len() as u8,
num_values: 0,
child_keys: [0; 4],
child_values: [const { None }; 4],
value,
};
init.prefix[0..prefix.len()].copy_from_slice(prefix);
unsafe { ptr.write(init) };
@@ -574,6 +448,20 @@ impl<V: Value> NodeInternal4<V> {
panic!("could not re-find parent with key {}", key_byte);
}
fn delete_child(&mut self, key_byte: u8) {
for i in 0..self.num_children as usize {
if self.child_keys[i] == key_byte {
self.num_children -= 1;
for j in i..self.num_children as usize {
self.child_keys[j] = self.child_keys[j + 1];
self.child_ptrs[j] = self.child_ptrs[j + 1];
}
return;
}
}
panic!("could not re-find parent with key {}", key_byte);
}
fn is_full(&self) -> bool {
self.num_children == 4
}
@@ -667,6 +555,20 @@ impl<V: Value> NodeInternal16<V> {
panic!("could not re-find parent with key {}", key_byte);
}
fn delete_child(&mut self, key_byte: u8) {
for i in 0..self.num_children as usize {
if self.child_keys[i] == key_byte {
self.num_children -= 1;
for j in i..self.num_children as usize {
self.child_keys[j] = self.child_keys[j + 1];
self.child_ptrs[j] = self.child_ptrs[j + 1];
}
return;
}
}
panic!("could not re-find parent with key {}", key_byte);
}
fn is_full(&self) -> bool {
self.num_children == 16
}
@@ -742,11 +644,32 @@ impl<V: Value> NodeInternal48<V> {
fn replace_child(&mut self, key_byte: u8, replacement: NodePtr<V>) {
let idx = self.child_indexes[key_byte as usize];
if idx != INVALID_CHILD_INDEX {
self.child_ptrs[idx as usize] = replacement
} else {
if idx == INVALID_CHILD_INDEX {
panic!("could not re-find parent with key {}", key_byte);
}
self.child_ptrs[idx as usize] = replacement
}
fn delete_child(&mut self, key_byte: u8) {
let idx = self.child_indexes[key_byte as usize] as usize;
if idx == INVALID_CHILD_INDEX as usize {
panic!("could not re-find parent with key {}", key_byte);
}
self.child_indexes[key_byte as usize] = INVALID_CHILD_INDEX;
self.num_children -= 1;
// Compact the child_ptrs array
let removed_idx = self.num_children as usize;
if idx != removed_idx {
for i in 0..u8::MAX as usize {
if self.child_indexes[i] as usize == removed_idx {
self.child_indexes[i] = idx as u8;
self.child_ptrs[idx] = self.child_ptrs[removed_idx];
return;
}
}
panic!("could not re-find last index on Internal48 node");
}
}
fn is_full(&self) -> bool {
@@ -830,6 +753,15 @@ impl<V: Value> NodeInternal256<V> {
}
}
fn delete_child(&mut self, key_byte: u8) {
let idx = key_byte as usize;
if self.child_ptrs[idx].is_null() {
panic!("could not re-find parent with key {}", key_byte);
}
self.num_children -= 1;
self.child_ptrs[idx] = NodePtr::null();
}
fn is_full(&self) -> bool {
self.num_children == 256
}
@@ -842,7 +774,7 @@ impl<V: Value> NodeInternal256<V> {
}
}
impl<V: Value> NodeLeaf4<V> {
impl<V: Value> NodeLeaf<V> {
fn get_prefix(&self) -> &[u8] {
&self.prefix[0..self.prefix_len as usize]
}
@@ -857,346 +789,12 @@ impl<V: Value> NodeLeaf4<V> {
self.prefix_len = new_prefix_len as u8;
}
fn get_leaf_value<'a: 'b, 'b>(&'a self, key: u8) -> Option<&'b V> {
for i in 0..self.num_values {
if self.child_keys[i as usize] == key {
assert!(self.child_values[i as usize].is_some());
return self.child_values[i as usize].as_ref();
}
}
None
fn get_leaf_value<'a: 'b, 'b>(&'a self) -> &'b V {
&self.value
}
fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> {
let mut found: Option<(usize, u8)> = None;
for i in 0..self.num_values as usize {
let this_key = self.child_keys[i];
if this_key >= min_key {
if let Some((_, found_key)) = found {
if this_key < found_key {
found = Some((i, this_key));
}
} else {
found = Some((i, this_key));
}
}
}
if let Some((found_idx, found_key)) = found {
Some((found_key, self.child_values[found_idx].as_ref().unwrap()))
} else {
None
}
}
fn is_full(&self) -> bool {
self.num_values == 4
}
fn insert_value(&mut self, key_byte: u8, value: V) {
assert!(self.num_values < 4);
let idx = self.num_values as usize;
self.child_keys[idx] = key_byte;
self.child_values[idx] = Some(value);
self.num_values += 1;
}
fn grow(&self, allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
let ptr: *mut NodeLeaf16<V> = allocator.alloc_node_leaf16();
if ptr.is_null() {
panic!("out of memory");
}
let mut init = NodeLeaf16 {
tag: NodeTag::Leaf16,
lock_and_version: AtomicLockAndVersion::new(),
prefix: self.prefix.clone(),
prefix_len: self.prefix_len,
num_values: self.num_values,
child_keys: [0; 16],
child_values: [const { None }; 16],
};
for i in 0..self.num_values as usize {
init.child_keys[i] = self.child_keys[i];
init.child_values[i] = self.child_values[i].clone();
}
unsafe { ptr.write(init) };
ptr.into()
}
fn delete_value(&mut self, key_byte: u8) {
assert!(self.num_values <= 4);
for i in 0..self.num_values as usize {
if self.child_keys[i] == key_byte {
assert!(self.child_values[i].is_some());
if i < self.num_values as usize - 1 {
self.child_keys[i] = self.child_keys[self.num_values as usize - 1];
self.child_values[i] = std::mem::replace(
&mut self.child_values[self.num_values as usize - 1],
None,
);
}
self.num_values -= 1;
return;
}
}
panic!("key to delete not found in leaf4 node");
}
}
impl<V: Value> NodeLeaf16<V> {
fn get_prefix(&self) -> &[u8] {
&self.prefix[0..self.prefix_len as usize]
}
fn truncate_prefix(&mut self, new_prefix_len: usize) {
assert!(new_prefix_len < self.prefix_len as usize);
let prefix = &mut self.prefix;
let offset = self.prefix_len as usize - new_prefix_len;
for i in 0..new_prefix_len {
prefix[i] = prefix[i + offset];
}
self.prefix_len = new_prefix_len as u8;
}
fn get_leaf_value(&self, key: u8) -> Option<&V> {
for i in 0..self.num_values {
if self.child_keys[i as usize] == key {
assert!(self.child_values[i as usize].is_some());
return self.child_values[i as usize].as_ref();
}
}
None
}
fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> {
let mut found: Option<(usize, u8)> = None;
for i in 0..self.num_values as usize {
let this_key = self.child_keys[i];
if this_key >= min_key {
if let Some((_, found_key)) = found {
if this_key < found_key {
found = Some((i, this_key));
}
} else {
found = Some((i, this_key));
}
}
}
if let Some((found_idx, found_key)) = found {
Some((found_key, self.child_values[found_idx].as_ref().unwrap()))
} else {
None
}
}
fn is_full(&self) -> bool {
self.num_values == 16
}
fn insert_value(&mut self, key_byte: u8, value: V) {
assert!(self.num_values < 16);
let idx = self.num_values as usize;
self.child_keys[idx] = key_byte;
self.child_values[idx] = Some(value);
self.num_values += 1;
}
fn grow(&self, allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
let ptr: *mut NodeLeaf48<V> = allocator.alloc_node_leaf48().cast();
if ptr.is_null() {
panic!("out of memory");
}
let mut init = NodeLeaf48 {
tag: NodeTag::Leaf48,
lock_and_version: AtomicLockAndVersion::new(),
prefix: self.prefix.clone(),
prefix_len: self.prefix_len,
num_values: self.num_values,
child_indexes: [INVALID_CHILD_INDEX; 256],
child_values: [const { None }; 48],
};
for i in 0..self.num_values {
let idx = self.child_keys[i as usize];
init.child_indexes[idx as usize] = i;
init.child_values[i as usize] = self.child_values[i as usize].clone();
}
unsafe { ptr.write(init) };
ptr.into()
}
fn delete_value(&mut self, key_byte: u8) {
assert!(self.num_values <= 16);
for i in 0..self.num_values as usize {
if self.child_keys[i as usize] == key_byte {
assert!(self.child_values[i as usize].is_some());
if i < self.num_values as usize - 1 {
self.child_keys[i] = self.child_keys[self.num_values as usize - 1];
self.child_values[i] = std::mem::replace(
&mut self.child_values[self.num_values as usize - 1],
None,
);
}
self.num_values -= 1;
return;
}
}
panic!("key to delete not found in leaf16 node");
}
}
impl<V: Value> NodeLeaf48<V> {
fn get_prefix(&self) -> &[u8] {
&self.prefix[0..self.prefix_len as usize]
}
fn truncate_prefix(&mut self, new_prefix_len: usize) {
assert!(new_prefix_len < self.prefix_len as usize);
let prefix = &mut self.prefix;
let offset = self.prefix_len as usize - new_prefix_len;
for i in 0..new_prefix_len {
prefix[i] = prefix[i + offset];
}
self.prefix_len = new_prefix_len as u8;
}
fn get_leaf_value(&self, key: u8) -> Option<&V> {
let idx = self.child_indexes[key as usize];
if idx != INVALID_CHILD_INDEX {
assert!(self.child_values[idx as usize].is_some());
self.child_values[idx as usize].as_ref()
} else {
None
}
}
fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> {
for key in min_key..=u8::MAX {
let idx = self.child_indexes[key as usize];
if idx != INVALID_CHILD_INDEX {
return Some((key, &self.child_values[idx as usize].as_ref().unwrap()));
}
}
None
}
fn is_full(&self) -> bool {
self.num_values == 48
}
fn insert_value(&mut self, key_byte: u8, value: V) {
assert!(self.num_values < 48);
assert!(self.child_indexes[key_byte as usize] == INVALID_CHILD_INDEX);
let idx = self.num_values;
self.child_indexes[key_byte as usize] = idx;
self.child_values[idx as usize] = Some(value);
self.num_values += 1;
}
fn grow(&self, allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
let ptr: *mut NodeLeaf256<V> = allocator.alloc_node_leaf256();
if ptr.is_null() {
panic!("out of memory");
}
let mut init = NodeLeaf256 {
tag: NodeTag::Leaf256,
lock_and_version: AtomicLockAndVersion::new(),
prefix: self.prefix.clone(),
prefix_len: self.prefix_len,
num_values: self.num_values as u16,
child_values: [const { None }; 256],
};
for i in 0..256 {
let idx = self.child_indexes[i];
if idx != INVALID_CHILD_INDEX {
init.child_values[i] = self.child_values[idx as usize].clone();
}
}
unsafe { ptr.write(init) };
ptr.into()
}
fn delete_value(&mut self, key_byte: u8) {
assert!(self.num_values <= 48);
let idx = self.child_indexes[key_byte as usize];
if idx == INVALID_CHILD_INDEX {
panic!("key to delete not found in leaf48 node");
}
self.child_indexes[key_byte as usize] = INVALID_CHILD_INDEX;
self.num_values -= 1;
if idx < self.num_values {
// Move all existing values with higher indexes down one position
for i in idx as usize..self.num_values as usize {
self.child_values[i] = std::mem::replace(&mut self.child_values[i + 1], None);
}
// Update all higher indexes
for i in 0..256 {
if self.child_indexes[i] != INVALID_CHILD_INDEX {
if self.child_indexes[i] > idx {
self.child_indexes[i] -= 1;
}
assert!(self.child_indexes[i] < self.num_values);
}
}
}
}
}
impl<V: Value> NodeLeaf256<V> {
fn get_prefix(&self) -> &[u8] {
&self.prefix[0..self.prefix_len as usize]
}
fn truncate_prefix(&mut self, new_prefix_len: usize) {
assert!(new_prefix_len < self.prefix_len as usize);
let prefix = &mut self.prefix;
let offset = self.prefix_len as usize - new_prefix_len;
for i in 0..new_prefix_len {
prefix[i] = prefix[i + offset];
}
self.prefix_len = new_prefix_len as u8;
}
fn get_leaf_value(&self, key: u8) -> Option<&V> {
let idx = key as usize;
self.child_values[idx].as_ref()
}
fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> {
for key in min_key..=u8::MAX {
if let Some(v) = &self.child_values[key as usize] {
return Some((key, v));
}
}
None
}
fn is_full(&self) -> bool {
self.num_values == 256
}
fn insert_value(&mut self, key_byte: u8, value: V) {
assert!(self.num_values < 256);
assert!(self.child_values[key_byte as usize].is_none());
self.child_values[key_byte as usize] = Some(value);
self.num_values += 1;
}
fn delete_value(&mut self, key_byte: u8) {
if self.child_values[key_byte as usize].is_none() {
panic!("key to delete not found in leaf256 node");
}
self.child_values[key_byte as usize] = None;
self.num_values -= 1;
fn get_leaf_value_mut<'a: 'b, 'b>(&'a mut self) -> &'b mut V {
&mut self.value
}
}
@@ -1250,34 +848,8 @@ impl<V: Value> From<*mut NodeInternal256<V>> for NodePtr<V> {
}
}
impl<V: Value> From<*mut NodeLeaf4<V>> for NodePtr<V> {
fn from(val: *mut NodeLeaf4<V>) -> NodePtr<V> {
NodePtr {
ptr: val.cast(),
phantom_value: PhantomData,
}
}
}
impl<V: Value> From<*mut NodeLeaf16<V>> for NodePtr<V> {
fn from(val: *mut NodeLeaf16<V>) -> NodePtr<V> {
NodePtr {
ptr: val.cast(),
phantom_value: PhantomData,
}
}
}
impl<V: Value> From<*mut NodeLeaf48<V>> for NodePtr<V> {
fn from(val: *mut NodeLeaf48<V>) -> NodePtr<V> {
NodePtr {
ptr: val.cast(),
phantom_value: PhantomData,
}
}
}
impl<V: Value> From<*mut NodeLeaf256<V>> for NodePtr<V> {
fn from(val: *mut NodeLeaf256<V>) -> NodePtr<V> {
impl<V: Value> From<*mut NodeLeaf<V>> for NodePtr<V> {
fn from(val: *mut NodeLeaf<V>) -> NodePtr<V> {
NodePtr {
ptr: val.cast(),
phantom_value: PhantomData,

View File

@@ -2,7 +2,6 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use super::node_ptr;
use super::node_ptr::ChildOrValuePtr;
use super::node_ptr::NodePtr;
use crate::EpochPin;
use crate::Value;
@@ -56,12 +55,11 @@ pub struct ReadLockedNodeRef<'e, V> {
phantom: PhantomData<&'e EpochPin<'e>>,
}
pub(crate) enum ChildOrValue<'e, V> {
Child(NodeRef<'e, V>),
Value(*const V),
}
impl<'e, V: Value> ReadLockedNodeRef<'e, V> {
pub(crate) fn is_leaf(&self) -> bool {
self.ptr.is_leaf()
}
pub(crate) fn is_full(&self) -> bool {
self.ptr.is_full()
}
@@ -78,43 +76,51 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> {
self.ptr.prefix_matches(key)
}
pub(crate) fn find_child_or_value_or_restart(
pub(crate) fn find_child_or_restart(
&self,
key_byte: u8,
) -> Result<Option<ChildOrValue<'e, V>>, ConcurrentUpdateError> {
let child_or_value = self.ptr.find_child_or_value(key_byte);
) -> Result<Option<NodeRef<'e, V>>, ConcurrentUpdateError> {
let child_or_value = self.ptr.find_child(key_byte);
self.ptr.lockword().check_or_restart(self.version)?;
match child_or_value {
None => Ok(None),
Some(ChildOrValuePtr::Value(vptr)) => Ok(Some(ChildOrValue::Value(vptr))),
Some(ChildOrValuePtr::Child(child_ptr)) => Ok(Some(ChildOrValue::Child(NodeRef {
Some(child_ptr) => Ok(Some(NodeRef {
ptr: child_ptr,
phantom: self.phantom,
}))),
})),
}
}
pub(crate) fn find_next_child_or_value_or_restart(
pub(crate) fn find_next_child_or_restart(
&self,
min_key_byte: u8,
) -> Result<Option<(u8, ChildOrValue<'e, V>)>, ConcurrentUpdateError> {
let child_or_value = self.ptr.find_next_child_or_value(min_key_byte);
) -> Result<Option<(u8, NodeRef<'e, V>)>, ConcurrentUpdateError> {
let child_or_value = self.ptr.find_next_child(min_key_byte);
self.ptr.lockword().check_or_restart(self.version)?;
match child_or_value {
None => Ok(None),
Some((k, ChildOrValuePtr::Value(vptr))) => Ok(Some((k, ChildOrValue::Value(vptr)))),
Some((k, ChildOrValuePtr::Child(child_ptr))) => Ok(Some((
Some((k, child_ptr)) => Ok(Some((
k,
ChildOrValue::Child(NodeRef {
NodeRef {
ptr: child_ptr,
phantom: self.phantom,
}),
},
))),
}
}
pub(crate) fn get_leaf_value_ptr(&self) -> Result<*const V, ConcurrentUpdateError> {
let result = self.ptr.get_leaf_value();
self.ptr.lockword().check_or_restart(self.version)?;
// Extend the lifetime.
let result = std::ptr::from_ref(result);
Ok(result)
}
pub(crate) fn upgrade_to_write_lock_or_restart(
self,
) -> Result<WriteLockedNodeRef<'e, V>, ConcurrentUpdateError> {
@@ -142,10 +148,6 @@ pub struct WriteLockedNodeRef<'e, V> {
}
impl<'e, V: Value> WriteLockedNodeRef<'e, V> {
pub(crate) fn is_leaf(&self) -> bool {
self.ptr.is_leaf()
}
pub(crate) fn write_unlock(mut self) {
self.ptr.lockword().write_unlock();
self.ptr = NodePtr::null();
@@ -168,12 +170,8 @@ impl<'e, V: Value> WriteLockedNodeRef<'e, V> {
self.ptr.insert_child(key_byte, child)
}
pub(crate) fn insert_value(&mut self, key_byte: u8, value: V) {
self.ptr.insert_value(key_byte, value)
}
pub(crate) fn delete_value(&mut self, key_byte: u8) {
self.ptr.delete_value(key_byte)
pub(crate) fn get_leaf_value_mut(&mut self) -> &mut V {
self.ptr.get_leaf_value_mut()
}
pub(crate) fn grow<'a, A>(
@@ -199,6 +197,10 @@ impl<'e, V: Value> WriteLockedNodeRef<'e, V> {
pub(crate) fn replace_child(&mut self, key_byte: u8, replacement: NodePtr<V>) {
self.ptr.replace_child(key_byte, replacement);
}
pub(crate) fn delete_child(&mut self, key_byte: u8) {
self.ptr.delete_child(key_byte);
}
}
impl<'e, V> Drop for WriteLockedNodeRef<'e, V> {
@@ -229,10 +231,6 @@ where
self.ptr.insert_child(key_byte, child.as_ptr())
}
pub(crate) fn insert_value(&mut self, key_byte: u8, value: V) {
self.ptr.insert_value(key_byte, value)
}
pub(crate) fn into_ptr(mut self) -> NodePtr<V> {
let ptr = self.ptr;
self.ptr = NodePtr::null();
@@ -279,6 +277,7 @@ where
pub(crate) fn new_leaf<'a, V, A>(
prefix: &[u8],
value: V,
allocator: &'a A,
) -> Result<NewNodeRef<'a, V, A>, OutOfMemoryError>
where
@@ -286,7 +285,7 @@ where
A: ArtAllocator<V>,
{
Ok(NewNodeRef {
ptr: node_ptr::new_leaf(prefix, allocator),
ptr: node_ptr::new_leaf(prefix, value, allocator),
allocator,
extra_nodes: Vec::new(),
})

View File

@@ -15,8 +15,7 @@ use spin;
use crate::ArtTreeStatistics;
use crate::Tree;
pub use crate::algorithm::node_ptr::{
NodeInternal4, NodeInternal16, NodeInternal48, NodeInternal256, NodeLeaf4, NodeLeaf16,
NodeLeaf48, NodeLeaf256,
NodeInternal4, NodeInternal16, NodeInternal48, NodeInternal256, NodeLeaf,
};
pub struct OutOfMemoryError();
@@ -28,19 +27,13 @@ pub trait ArtAllocator<V: crate::Value> {
fn alloc_node_internal16(&self) -> *mut NodeInternal16<V>;
fn alloc_node_internal48(&self) -> *mut NodeInternal48<V>;
fn alloc_node_internal256(&self) -> *mut NodeInternal256<V>;
fn alloc_node_leaf4(&self) -> *mut NodeLeaf4<V>;
fn alloc_node_leaf16(&self) -> *mut NodeLeaf16<V>;
fn alloc_node_leaf48(&self) -> *mut NodeLeaf48<V>;
fn alloc_node_leaf256(&self) -> *mut NodeLeaf256<V>;
fn alloc_node_leaf(&self) -> *mut NodeLeaf<V>;
fn dealloc_node_internal4(&self, ptr: *mut NodeInternal4<V>);
fn dealloc_node_internal16(&self, ptr: *mut NodeInternal16<V>);
fn dealloc_node_internal48(&self, ptr: *mut NodeInternal48<V>);
fn dealloc_node_internal256(&self, ptr: *mut NodeInternal256<V>);
fn dealloc_node_leaf4(&self, ptr: *mut NodeLeaf4<V>);
fn dealloc_node_leaf16(&self, ptr: *mut NodeLeaf16<V>);
fn dealloc_node_leaf48(&self, ptr: *mut NodeLeaf48<V>);
fn dealloc_node_leaf256(&self, ptr: *mut NodeLeaf256<V>);
fn dealloc_node_leaf(&self, ptr: *mut NodeLeaf<V>);
}
pub struct ArtMultiSlabAllocator<'t, V>
@@ -49,21 +42,18 @@ where
{
tree_area: spin::Mutex<Option<&'t mut MaybeUninit<Tree<V>>>>,
inner: MultiSlabAllocator<'t, 8>,
inner: MultiSlabAllocator<'t, 5>,
phantom_val: PhantomData<V>,
}
impl<'t, V: crate::Value> ArtMultiSlabAllocator<'t, V> {
const LAYOUTS: [Layout; 8] = [
const LAYOUTS: [Layout; 5] = [
Layout::new::<NodeInternal4<V>>(),
Layout::new::<NodeInternal16<V>>(),
Layout::new::<NodeInternal48<V>>(),
Layout::new::<NodeInternal256<V>>(),
Layout::new::<NodeLeaf4<V>>(),
Layout::new::<NodeLeaf16<V>>(),
Layout::new::<NodeLeaf48<V>>(),
Layout::new::<NodeLeaf256<V>>(),
Layout::new::<NodeLeaf<V>>(),
];
pub fn new(area: &'t mut [MaybeUninit<u8>]) -> &'t mut ArtMultiSlabAllocator<'t, V> {
@@ -101,18 +91,9 @@ impl<'t, V: crate::Value> ArtAllocator<V> for ArtMultiSlabAllocator<'t, V> {
fn alloc_node_internal256(&self) -> *mut NodeInternal256<V> {
self.inner.alloc_slab(3).cast()
}
fn alloc_node_leaf4(&self) -> *mut NodeLeaf4<V> {
fn alloc_node_leaf(&self) -> *mut NodeLeaf<V> {
self.inner.alloc_slab(4).cast()
}
fn alloc_node_leaf16(&self) -> *mut NodeLeaf16<V> {
self.inner.alloc_slab(5).cast()
}
fn alloc_node_leaf48(&self) -> *mut NodeLeaf48<V> {
self.inner.alloc_slab(6).cast()
}
fn alloc_node_leaf256(&self) -> *mut NodeLeaf256<V> {
self.inner.alloc_slab(7).cast()
}
fn dealloc_node_internal4(&self, ptr: *mut NodeInternal4<V>) {
self.inner.dealloc_slab(0, ptr.cast())
@@ -127,18 +108,9 @@ impl<'t, V: crate::Value> ArtAllocator<V> for ArtMultiSlabAllocator<'t, V> {
fn dealloc_node_internal256(&self, ptr: *mut NodeInternal256<V>) {
self.inner.dealloc_slab(3, ptr.cast())
}
fn dealloc_node_leaf4(&self, ptr: *mut NodeLeaf4<V>) {
fn dealloc_node_leaf(&self, ptr: *mut NodeLeaf<V>) {
self.inner.dealloc_slab(4, ptr.cast())
}
fn dealloc_node_leaf16(&self, ptr: *mut NodeLeaf16<V>) {
self.inner.dealloc_slab(5, ptr.cast())
}
fn dealloc_node_leaf48(&self, ptr: *mut NodeLeaf48<V>) {
self.inner.dealloc_slab(6, ptr.cast())
}
fn dealloc_node_leaf256(&self, ptr: *mut NodeLeaf256<V>) {
self.inner.dealloc_slab(7, ptr.cast())
}
}
impl<'t, V: crate::Value> ArtMultiSlabAllocator<'t, V> {

View File

@@ -235,7 +235,6 @@ impl SlabDesc {
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -20,8 +20,7 @@
//!
//! - All keys have the same length
//!
//! - Multi-value leaves. The values are stored directly in one of the four different leaf node
//! types.
//! - Single-value leaves.
//!
//! - For collapsing inner nodes, we use the Pessimistic approach, where each inner node stores a
//! variable length "prefix", which stores the keys of all the one-way nodes which have been
@@ -144,7 +143,7 @@ pub use allocator::ArtMultiSlabAllocator;
/// Fixed-length key type.
///
pub trait Key: Clone + Debug {
pub trait Key: Debug {
const KEY_LEN: usize;
fn as_bytes(&self) -> &[u8];
@@ -154,7 +153,8 @@ pub trait Key: Clone + Debug {
///
/// Values need to be Cloneable, because when a node "grows", the value is copied to a new node and
/// the old sticks around until all readers that might see the old value are gone.
pub trait Value: Clone {}
// fixme obsolete, no longer needs Clone
pub trait Value {}
const MAX_GARBAGE: usize = 1024;
@@ -277,7 +277,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator<V>> TreeInitStruct<'t, K, V,
}
}
impl<'t, K: Key + Clone, V: Value, A: ArtAllocator<V>> TreeWriteAccess<'t, K, V, A> {
impl<'t, K: Key, V: Value, A: ArtAllocator<V>> TreeWriteAccess<'t, K, V, A> {
pub fn start_write<'g>(&'t self) -> TreeWriteGuard<'g, K, V, A>
where
't: 'g,
@@ -299,7 +299,7 @@ impl<'t, K: Key + Clone, V: Value, A: ArtAllocator<V>> TreeWriteAccess<'t, K, V,
}
}
impl<'t, K: Key + Clone, V: Value> TreeReadAccess<'t, K, V> {
impl<'t, K: Key, V: Value> TreeReadAccess<'t, K, V> {
pub fn start_read(&'t self) -> TreeReadGuard<'t, K, V> {
TreeReadGuard {
tree: &self.tree,
@@ -340,23 +340,58 @@ where
created_garbage: bool,
}
impl<'t, K: Key, V: Value, A: ArtAllocator<V>> TreeWriteGuard<'t, K, V, A> {
pub enum UpdateAction<V> {
Nothing,
Insert(V),
Remove,
}
impl<'e, K: Key, V: Value, A: ArtAllocator<V>> TreeWriteGuard<'e, K, V, A> {
/// Get a value
pub fn get(&'t mut self, key: &K) -> Option<&'t V> {
pub fn get(&'e mut self, key: &K) -> Option<&'e V> {
algorithm::search(key, self.tree_writer.tree.root, &self.epoch_pin)
}
/// Insert a value
pub fn insert(self, key: &K, value: V) {
self.update_with_fn(key, |_| Some(value))
pub fn insert(self, key: &K, value: V) -> Result<(), ()> {
let mut success = None;
self.update_with_fn(key, |existing| {
if let Some(_) = existing {
success = Some(false);
UpdateAction::Nothing
} else {
success = Some(true);
UpdateAction::Insert(value)
}
});
if success.expect("value_fn not called") {
Ok(())
} else {
Err(())
}
}
/// Remove value
pub fn remove(self, key: &K) -> Option<V> {
/// Remove value. Returns true if it existed
pub fn remove(self, key: &K) -> bool
{
let mut result = false;
self.update_with_fn(key, |existing| {
result = existing.is_some();
UpdateAction::Remove
});
result
}
/// Try to remove value and return the old value.
pub fn remove_and_return(self, key: &K) -> Option<V>
where
V: Clone,
{
let mut old = None;
self.update_with_fn(key, |existing| {
old = existing.cloned();
None
UpdateAction::Remove
});
old
}
@@ -366,10 +401,10 @@ impl<'t, K: Key, V: Value, A: ArtAllocator<V>> TreeWriteGuard<'t, K, V, A> {
/// The function is passed a reference to the existing value, if any. If the function
/// returns None, the value is removed from the tree (or if there was no existing value,
/// does nothing). If the function returns Some, the existing value is replaced, of if there
/// was no existing value, it is inserted.
/// was no existing value, it is inserted. FIXME: update comment
pub fn update_with_fn<F>(mut self, key: &K, value_fn: F)
where
F: FnOnce(Option<&V>) -> Option<V>,
F: FnOnce(Option<&V>) -> UpdateAction<V>,
{
algorithm::update_fn(key, value_fn, self.tree_writer.tree.root, &mut self);
@@ -511,12 +546,12 @@ fn increment_key(key: &mut [u8]) -> bool {
}
// Debugging functions
impl<'t, K: Key, V: Value + Debug> TreeReadGuard<'t, K, V> {
impl<'e, K: Key, V: Value + Debug> TreeReadGuard<'e, K, V> {
pub fn dump(&mut self) {
algorithm::dump_tree(self.tree.root, &self.epoch_pin)
}
}
impl<'t, K: Key, V: Value + Debug> TreeWriteGuard<'t, K, V, ArtMultiSlabAllocator<'t, V>> {
impl<'e, K: Key, V: Value + Debug> TreeWriteGuard<'e, K, V, ArtMultiSlabAllocator<'e, V>> {
pub fn get_statistics(&self) -> ArtTreeStatistics {
self.tree_writer.allocator.get_statistics()
}

View File

@@ -1,11 +1,14 @@
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::ArtAllocator;
use crate::ArtMultiSlabAllocator;
use crate::TreeInitStruct;
use crate::TreeIterator;
use crate::TreeWriteAccess;
use crate::UpdateAction;
use crate::{Key, Value};
@@ -55,7 +58,8 @@ fn test_inserts<K: Into<TestKey> + Copy>(keys: &[K]) {
for (idx, k) in keys.iter().enumerate() {
let w = tree_writer.start_write();
w.insert(&(*k).into(), idx);
let res = w.insert(&(*k).into(), idx);
assert!(res.is_ok());
}
for (idx, k) in keys.iter().enumerate() {
@@ -103,12 +107,38 @@ fn sparse() {
test_inserts(&keys);
}
#[derive(Clone, Copy, Debug)]
struct TestValue(AtomicUsize);
impl TestValue {
fn new(val: usize) -> TestValue {
TestValue(AtomicUsize::new(val))
}
fn load(&self) -> usize {
self.0.load(Ordering::Relaxed)
}
}
impl Value for TestValue {}
impl Clone for TestValue {
fn clone(&self) -> TestValue {
TestValue::new(self.load())
}
}
impl Debug for TestValue {
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(fmt, "{:?}", self.load())
}
}
#[derive(Clone, Debug)]
struct TestOp(TestKey, Option<usize>);
fn apply_op<A: ArtAllocator<usize>>(
fn apply_op<A: ArtAllocator<TestValue>>(
op: &TestOp,
tree: &TreeWriteAccess<TestKey, usize, A>,
tree: &TreeWriteAccess<TestKey, TestValue, A>,
shadow: &mut BTreeMap<TestKey, usize>,
) {
eprintln!("applying op: {op:?}");
@@ -123,24 +153,33 @@ fn apply_op<A: ArtAllocator<usize>>(
// apply to Art tree
let w = tree.start_write();
w.update_with_fn(&op.0, |existing| {
assert_eq!(existing, shadow_existing.as_ref());
return op.1;
assert_eq!(existing.map(TestValue::load), shadow_existing);
match (existing, op.1) {
(None, None) => UpdateAction::Nothing,
(None, Some(new_val)) => UpdateAction::Insert(TestValue::new(new_val)),
(Some(_old_val), None) => UpdateAction::Remove,
(Some(old_val), Some(new_val)) => {
old_val.0.store(new_val, Ordering::Relaxed);
UpdateAction::Nothing
}
}
});
}
fn test_iter<A: ArtAllocator<usize>>(
tree: &TreeWriteAccess<TestKey, usize, A>,
fn test_iter<A: ArtAllocator<TestValue>>(
tree: &TreeWriteAccess<TestKey, TestValue, A>,
shadow: &BTreeMap<TestKey, usize>,
) {
let mut shadow_iter = shadow.iter();
let mut iter = TreeIterator::new(&(TestKey::MIN..TestKey::MAX));
loop {
let shadow_item = shadow_iter.next().map(|(k, v)| (k.clone(), v));
let shadow_item = shadow_iter.next().map(|(k, v)| (k.clone(), v.clone()));
let r = tree.start_read();
let item = iter.next(&r);
if shadow_item != item {
if shadow_item != item.map(|(k, v)| (k, v.load())) {
eprintln!(
"FAIL: iterator returned {:?}, expected {:?}",
item, shadow_item
@@ -170,7 +209,7 @@ fn random_ops() {
let allocator = ArtMultiSlabAllocator::new(&mut area);
let init_struct = TreeInitStruct::<TestKey, usize, _>::new(allocator);
let init_struct = TreeInitStruct::<TestKey, TestValue, _>::new(allocator);
let tree_writer = init_struct.attach_writer();
let mut shadow: std::collections::BTreeMap<TestKey, usize> = BTreeMap::new();

View File

@@ -9,6 +9,7 @@ crate-type = ["staticlib"]
[dependencies]
axum.workspace = true
bytes.workspace = true
clashmap.workspace = true
http.workspace = true
libc.workspace = true
nix.workspace = true

View File

@@ -20,6 +20,8 @@ use tokio::task::spawn_blocking;
pub type CacheBlock = u64;
pub const INVALID_CACHE_BLOCK: CacheBlock = u64::MAX;
pub struct FileCache {
file: Arc<File>,
@@ -39,10 +41,7 @@ struct FreeList {
}
impl FileCache {
pub fn new(
file_cache_path: &Path,
mut initial_size: u64,
) -> Result<FileCache, std::io::Error> {
pub fn new(file_cache_path: &Path, mut initial_size: u64) -> Result<FileCache, std::io::Error> {
if initial_size < 100 {
tracing::warn!(
"min size for file cache is 100 blocks, {} requested",
@@ -95,7 +94,8 @@ impl FileCache {
let dst_ref = unsafe { std::slice::from_raw_parts_mut(dst.stable_mut_ptr(), BLCKSZ) };
spawn_blocking(move || file.read_exact_at(dst_ref, cache_block as u64 * BLCKSZ as u64)).await??;
spawn_blocking(move || file.read_exact_at(dst_ref, cache_block as u64 * BLCKSZ as u64))
.await??;
Ok(())
}
@@ -109,7 +109,8 @@ impl FileCache {
let src_ref = unsafe { std::slice::from_raw_parts(src.stable_ptr(), BLCKSZ) };
spawn_blocking(move || file.write_all_at(src_ref, cache_block as u64 * BLCKSZ as u64)).await??;
spawn_blocking(move || file.write_all_at(src_ref, cache_block as u64 * BLCKSZ as u64))
.await??;
Ok(())
}

View File

@@ -24,15 +24,17 @@
use std::mem::MaybeUninit;
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use utils::lsn::Lsn;
use utils::lsn::{Lsn, AtomicLsn};
use zerocopy::FromBytes;
use crate::file_cache::{CacheBlock, FileCache};
use crate::file_cache::INVALID_CACHE_BLOCK;
use pageserver_page_api::model::RelTag;
use neonart;
use neonart::UpdateAction;
use neonart::TreeInitStruct;
use neonart::TreeIterator;
@@ -123,36 +125,25 @@ impl<'t> IntegratedCacheInitStruct<'t> {
}
}
#[derive(Clone)]
enum TreeEntry {
Rel(RelEntry),
Block(BlockEntry),
}
struct BlockEntry {
lw_lsn: Lsn,
cache_block: Option<CacheBlock>,
lw_lsn: AtomicLsn,
cache_block: AtomicU64,
io_in_progress: AtomicBool,
pinned: AtomicBool,
// 'referenced' bit for the clock algorithm
referenced: AtomicBool,
}
impl Clone for BlockEntry {
fn clone(&self) -> BlockEntry {
BlockEntry {
lw_lsn: self.lw_lsn,
cache_block: self.cache_block,
referenced: AtomicBool::new(self.referenced.load(Ordering::Relaxed)),
}
}
}
#[derive(Clone, Default)]
struct RelEntry {
/// cached size of the relation
nblocks: Option<u32>,
/// u32::MAX means 'not known' (that's InvalidBlockNumber in Postgres)
nblocks: AtomicU32,
}
#[derive(
@@ -272,7 +263,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
};
block_entry.referenced.store(true, Ordering::Relaxed);
if let Some(cache_block) = block_entry.cache_block {
let cache_block = block_entry.cache_block.load(Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
self.file_cache
.as_ref()
.unwrap()
@@ -280,7 +272,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
.await?;
Ok(CacheResult::Found(()))
} else {
Ok(CacheResult::NotFound(block_entry.lw_lsn))
Ok(CacheResult::NotFound(block_entry.lw_lsn.load()))
}
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
@@ -305,10 +297,12 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// in cache.
block_entry.referenced.store(true, Ordering::Relaxed);
if let Some(_cache_block) = block_entry.cache_block {
let cache_block = block_entry.cache_block.load(Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
Ok(CacheResult::Found(()))
} else {
Ok(CacheResult::NotFound(block_entry.lw_lsn))
Ok(CacheResult::NotFound(block_entry.lw_lsn.load()))
}
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
@@ -341,12 +335,19 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) {
let w = self.cache_tree.start_write();
w.insert(
&TreeKey::from(rel),
TreeEntry::Rel(RelEntry {
nblocks: Some(nblocks),
}),
);
w.update_with_fn(&TreeKey::from(rel), |existing| {
match existing {
None => UpdateAction::Insert(
TreeEntry::Rel(RelEntry {
nblocks: AtomicU32::new(nblocks),
})),
Some(TreeEntry::Block(_)) => panic!("unexpected tree entry type for rel key"),
Some(TreeEntry::Rel(rel)) => {
rel.nblocks.store(nblocks, Ordering::Relaxed);
UpdateAction::Nothing
}
}
});
}
/// Remember the given page contents in the cache.
@@ -356,58 +357,159 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
block_number: u32,
src: impl uring_common::buf::IoBuf + Send + Sync,
lw_lsn: Lsn,
is_write: bool,
) {
if let Some(file_cache) = self.file_cache.as_ref() {
let key = TreeKey::from((rel, block_number));
// FIXME: make this work when file cache is disabled. Or make it mandatory
let file_cache = self.file_cache.as_ref().unwrap();
if is_write {
// there should be no concurrent IOs. If a backend tries to read the page
// at the same time, they may get a torn write. That's the same as with
// regular POSIX filesystem read() and write()
// First check if we have a block in cache already
let w = self.cache_tree.start_write();
let key = TreeKey::from((rel, block_number));
let mut reserved_cache_block = loop {
if let Some(x) = file_cache.alloc_block() {
break Some(x);
}
if let Some(x) = self.try_evict_one_cache_block() {
break Some(x);
}
};
let mut cache_block = None;
let mut old_cache_block = None;
let mut found_existing = false;
w.update_with_fn(&key, |existing| {
if let Some(existing) = existing {
let mut block_entry = if let TreeEntry::Block(e) = existing.clone() {
let block_entry = if let TreeEntry::Block(e) = existing {
e
} else {
panic!("unexpected tree entry type for block key");
};
block_entry.referenced.store(true, Ordering::Relaxed);
block_entry.lw_lsn = lw_lsn;
if block_entry.cache_block.is_none() {
block_entry.cache_block = reserved_cache_block.take();
found_existing = true;
// Prevent this entry from being evicted
let was_pinned = block_entry.pinned.swap(true, Ordering::Relaxed);
if was_pinned {
// this is unexpected, because the caller has obtained the io-in-progress lock,
// so no one else should try to modify the page at the same time.
panic!("block entry was unexpectedly pinned");
}
let cache_block = block_entry.cache_block.load(Ordering::Relaxed);
old_cache_block = if cache_block != INVALID_CACHE_BLOCK {
Some(cache_block)
} else {
None
};
}
// if there was no existing entry, we will insert one, but not yet
UpdateAction::Nothing
});
// Allocate a new block if required
let cache_block = old_cache_block.unwrap_or_else(|| {
loop {
if let Some(x) = file_cache.alloc_block() {
break x;
}
if let Some(x) = self.try_evict_one_cache_block() {
break x;
}
cache_block = block_entry.cache_block;
Some(TreeEntry::Block(block_entry))
} else {
cache_block = reserved_cache_block.take();
Some(TreeEntry::Block(BlockEntry {
lw_lsn: lw_lsn,
cache_block: cache_block,
referenced: AtomicBool::new(true),
}))
}
});
// If we didn't need to block we reserved, put it back to the free list
if let Some(x) = reserved_cache_block {
file_cache.dealloc_block(x);
}
let cache_block = cache_block.unwrap();
// Write the page to the cache file
file_cache
.write_block(cache_block, src)
.await
.expect("error writing to cache");
};
// FIXME: handle errors gracefully.
// FIXME: unpin the block entry on error
// Update the block entry
let w = self.cache_tree.start_write();
w.update_with_fn(&key, |existing| {
assert_eq!(found_existing, existing.is_some());
if let Some(existing) = existing {
let block_entry = if let TreeEntry::Block(e) = existing {
e
} else {
panic!("unexpected tree entry type for block key");
};
// Update the cache block
let old_blk = block_entry.cache_block.compare_exchange(INVALID_CACHE_BLOCK, cache_block, Ordering::Relaxed, Ordering::Relaxed);
assert!(old_blk == Ok(INVALID_CACHE_BLOCK) || old_blk == Err(cache_block));
block_entry.lw_lsn.store(lw_lsn);
block_entry.referenced.store(true, Ordering::Relaxed);
let was_pinned = block_entry.pinned.swap(false, Ordering::Relaxed);
assert!(was_pinned);
UpdateAction::Nothing
}
else
{
UpdateAction::Insert(TreeEntry::Block(BlockEntry {
lw_lsn: AtomicLsn::new(lw_lsn.0),
cache_block: AtomicU64::new(cache_block),
pinned: AtomicBool::new(false),
referenced: AtomicBool::new(true),
}))
}
});
} else {
// !is_write
//
// We can assume that it doesn't already exist, because the
// caller is assumed to have already checked it, and holds
// the io-in-progress lock. (The BlockEntry might exist, but no cache block)
// Allocate a new block first
let cache_block = {
loop {
if let Some(x) = file_cache.alloc_block() {
break x;
}
if let Some(x) = self.try_evict_one_cache_block() {
break x;
}
}
};
// Write the page to the cache file
file_cache
.write_block(cache_block, src)
.await
.expect("error writing to cache");
// FIXME: handle errors gracefully.
let w = self.cache_tree.start_write();
w.update_with_fn(&key, |existing| {
if let Some(existing) = existing {
let block_entry = if let TreeEntry::Block(e) = existing {
e
} else {
panic!("unexpected tree entry type for block key");
};
assert!(!block_entry.pinned.load(Ordering::Relaxed));
let old_cache_block = block_entry.cache_block.swap(cache_block, Ordering::Relaxed);
if old_cache_block != INVALID_CACHE_BLOCK {
panic!("remember_page called in !is_write mode, but page is already cached at blk {}", old_cache_block);
}
UpdateAction::Nothing
} else {
UpdateAction::Insert(TreeEntry::Block(BlockEntry {
lw_lsn: AtomicLsn::new(lw_lsn.0),
cache_block: AtomicU64::new(cache_block),
pinned: AtomicBool::new(false),
referenced: AtomicBool::new(true),
}))
}
});
}
}
/// Forget information about given relation in the cache. (For DROP TABLE and such)
@@ -447,17 +549,26 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
if !blk_entry.referenced.swap(false, Ordering::Relaxed) {
// Evict this
let w = self.cache_tree.start_write();
let old = w.remove(&k);
if let Some(TreeEntry::Block(old)) = old {
let _ = self
.global_lw_lsn
.fetch_max(old.lw_lsn.0, Ordering::Relaxed);
if let Some(cache_block) = old.cache_block {
return Some(cache_block);
let mut evicted_cache_block = None;
w.update_with_fn(&k, |old| {
match old {
None => UpdateAction::Nothing,
Some(TreeEntry::Rel(_)) => panic!("unexepcted Rel entry"),
Some(TreeEntry::Block(old)) => {
let _ = self
.global_lw_lsn
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
let cache_block = old.cache_block.load(Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
evicted_cache_block = Some(cache_block);
}
// TODO: we don't evict the entry, just the block. Does it make
// sense to keep the entry?
UpdateAction::Nothing
}
}
} else {
assert!(old.is_none());
}
});
}
}
}
@@ -479,7 +590,8 @@ fn get_rel_size<'t>(r: &neonart::TreeReadGuard<TreeKey, TreeEntry>, rel: &RelTag
panic!("unexpected tree entry type for rel key");
};
if let Some(nblocks) = rel_entry.nblocks {
let nblocks = rel_entry.nblocks.load(Ordering::Relaxed);
if nblocks != u32::MAX {
Some(nblocks)
} else {
None
@@ -526,7 +638,12 @@ impl<'e> BackendCacheReadOp<'e> {
};
block_entry.referenced.store(true, Ordering::Relaxed);
block_entry.cache_block
let cache_block = block_entry.cache_block.load(Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
Some(cache_block)
} else {
None
}
} else {
None
}

View File

@@ -0,0 +1,81 @@
use std::hash::Hash;
use std::sync::Arc;
use std::cmp::Eq;
use tokio::sync::{Mutex, OwnedMutexGuard};
use clashmap::ClashMap;
use clashmap::Entry;
use pageserver_page_api::model;
#[derive(Clone, Eq, Hash, PartialEq)]
pub enum RequestInProgressKey {
Db(u32),
Rel(model::RelTag),
Block(model::RelTag, u32),
}
pub type RequestInProgressTable = MutexHashSet<RequestInProgressKey>;
// more primitive locking thingie:
pub struct MutexHashSet<K>
where K: Clone + Eq + Hash
{
lock_table: ClashMap<K, Arc<Mutex<()>>>,
}
pub struct MutexHashSetGuard<'a, K>
where K: Clone + Eq + Hash
{
pub key: K,
set: &'a MutexHashSet<K>,
mutex: Arc<Mutex<()>>,
_guard: OwnedMutexGuard<()>,
}
impl<'a, K> Drop for MutexHashSetGuard<'a, K>
where K: Clone + Eq + Hash
{
fn drop(&mut self) {
let (_old_key, old_val) = self.set.lock_table.remove(&self.key).unwrap();
assert!(Arc::ptr_eq(&old_val, &self.mutex));
// the guard will be dropped as we return
}
}
impl<K> MutexHashSet<K>
where K: Clone + Eq + Hash
{
pub fn new() -> MutexHashSet<K> {
MutexHashSet {
lock_table: ClashMap::new(),
}
}
pub async fn lock<'a>(&'a self, key: K) -> MutexHashSetGuard<'a, K>
{
let my_mutex = Arc::new(Mutex::new(()));
let my_guard = Arc::clone(&my_mutex).lock_owned().await;
loop {
let lock = match self.lock_table.entry(key.clone()) {
Entry::Occupied(e) => Arc::clone(e.get()),
Entry::Vacant(e) => {
e.insert(Arc::clone(&my_mutex));
break;
}
};
let _ = lock.lock().await;
}
MutexHashSetGuard {
key: key,
set: &self,
mutex: my_mutex,
_guard: my_guard,
}
}
}

View File

@@ -8,12 +8,13 @@ use crate::init::CommunicatorInitStruct;
use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess};
use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest};
use crate::neon_request::{NeonIORequest, NeonIOResult};
use crate::worker_process::in_progress_ios::{RequestInProgressTable, RequestInProgressKey};
use pageserver_client_grpc::PageserverClient;
use pageserver_page_api::model;
use tokio::io::AsyncReadExt;
use uring_common::buf::IoBuf;
use tokio_pipe::PipeRead;
use uring_common::buf::IoBuf;
use super::callbacks::{get_request_lsn, notify_proc};
@@ -31,8 +32,11 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
submission_pipe_read_raw_fd: i32,
next_request_id: AtomicU64,
in_progress_table: RequestInProgressTable,
}
pub(super) async fn init(
cis: Box<CommunicatorInitStruct>,
tenant_id: String,
@@ -45,10 +49,7 @@ pub(super) async fn init(
let last_lsn = get_request_lsn();
let file_cache = if let Some(path) = file_cache_path {
Some(
FileCache::new(&path, file_cache_size)
.expect("could not create cache file"),
)
Some(FileCache::new(&path, file_cache_size).expect("could not create cache file"))
} else {
// FIXME: temporarily for testing, use LFC even if disabled
Some(
@@ -70,6 +71,7 @@ pub(super) async fn init(
cache,
submission_pipe_read_raw_fd: cis.submission_pipe_read_fd,
next_request_id: AtomicU64::new(1),
in_progress_table: RequestInProgressTable::new(),
};
this
@@ -142,6 +144,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIORequest::RelExists(req) => {
let rel = req.reltag();
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone()));
let not_modified_since = match self.cache.get_rel_exists(&rel) {
CacheResult::Found(exists) => return NeonIOResult::RelExists(exists),
CacheResult::NotFound(lsn) => lsn,
@@ -166,6 +170,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIORequest::RelSize(req) => {
let rel = req.reltag();
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone()));
// Check the cache first
let not_modified_since = match self.cache.get_rel_size(&rel) {
CacheResult::Found(nblocks) => {
@@ -207,6 +213,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIOResult::PrefetchVLaunched
}
NeonIORequest::DbSize(req) => {
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Db(req.db_oid));
// Check the cache first
let not_modified_since = match self.cache.get_db_size(req.db_oid) {
CacheResult::Found(db_size) => {
@@ -236,30 +244,36 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIORequest::WritePage(req) => {
// Also store it in the LFC while we still have it
let rel = req.reltag();
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), req.block_number));
self.cache
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn))
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true)
.await;
NeonIOResult::WriteOK
}
NeonIORequest::RelExtend(req) => {
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache
.remember_rel_size(&req.reltag(), req.block_number + 1);
NeonIOResult::WriteOK
}
NeonIORequest::RelZeroExtend(req) => {
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache
.remember_rel_size(&req.reltag(), req.block_number + req.nblocks);
NeonIOResult::WriteOK
}
NeonIORequest::RelCreate(req) => {
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache.remember_rel_size(&req.reltag(), 0);
NeonIOResult::WriteOK
}
NeonIORequest::RelTruncate(req) => {
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache.remember_rel_size(&req.reltag(), req.nblocks);
NeonIOResult::WriteOK
}
NeonIORequest::RelUnlink(req) => {
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache.forget_rel(&req.reltag());
NeonIOResult::WriteOK
}
@@ -270,9 +284,14 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let rel = req.reltag();
// Check the cache first
let mut cache_misses = Vec::new();
let mut cache_misses = Vec::with_capacity(req.nblocks as usize);
for i in 0..req.nblocks {
let blkno = req.block_number + i as u32;
// note: this is deadlock-safe even though we hold multiple locks at the same time,
// because they're always acquired in the same order.
let in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), blkno)).await;
let dest = req.dest[i as usize];
let not_modified_since = match self.cache.get_page(&rel, blkno, dest).await {
Ok(CacheResult::Found(_)) => {
@@ -283,19 +302,19 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
Ok(CacheResult::NotFound(lsn)) => lsn,
Err(_io_error) => return Err(-1), // FIXME errno?
};
cache_misses.push((blkno, not_modified_since, dest));
cache_misses.push((blkno, not_modified_since, dest, in_progress_guard));
}
if cache_misses.is_empty() {
return Ok(());
}
let not_modified_since = cache_misses
.iter()
.map(|(_blkno, lsn, _dest)| *lsn)
.map(|(_blkno, lsn, _dest, _guard)| *lsn)
.max()
.unwrap();
// TODO: Use batched protocol
for (blkno, _lsn, dest) in cache_misses.iter() {
for (blkno, _lsn, dest, _guard) in cache_misses.iter() {
match self
.pageserver_client
.get_page(&model::GetPageRequest {
@@ -316,11 +335,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
std::ptr::copy_nonoverlapping(src.as_ptr(), dest.as_mut_ptr(), len);
};
trace!("remembering blk {} in rel {:?} in LFC", blkno, rel);
// Also store it in the LFC while we have it
self.cache
.remember_page(&rel, *blkno, page_image, not_modified_since)
.remember_page(&rel, *blkno, page_image, not_modified_since, false)
.await;
}
Err(err) => {
@@ -339,29 +356,34 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let rel = req.reltag();
// Check the cache first
let mut cache_misses = Vec::new();
let mut cache_misses = Vec::with_capacity(req.nblocks as usize);
for i in 0..req.nblocks {
let blkno = req.block_number + i as u32;
// note: this is deadlock-safe even though we hold multiple locks at the same time,
// because they're always acquired in the same order.
let in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), blkno)).await;
let not_modified_since = match self.cache.page_is_cached(&rel, blkno).await {
Ok(CacheResult::Found(_)) => {
trace!("found blk {} in rel {:?} in LFC ", req.block_number, rel);
trace!("found blk {} in rel {:?} in LFC ", blkno, rel);
continue;
}
Ok(CacheResult::NotFound(lsn)) => lsn,
Err(_io_error) => return Err(-1), // FIXME errno?
};
cache_misses.push((req.block_number, not_modified_since));
cache_misses.push((blkno, not_modified_since, in_progress_guard));
}
if cache_misses.is_empty() {
return Ok(());
}
let not_modified_since = cache_misses.iter().map(|(_blkno, lsn)| *lsn).max().unwrap();
let not_modified_since = cache_misses.iter().map(|(_blkno, lsn, _guard)| *lsn).max().unwrap();
// TODO: spawn separate tasks for these. Use the integrated cache to keep track of the
// in-flight requests
// TODO: Use batched protocol
for (blkno, _lsn) in cache_misses.iter() {
for (blkno, _lsn, _guard) in cache_misses.iter() {
match self
.pageserver_client
.get_page(&model::GetPageRequest {
@@ -376,10 +398,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
Ok(page_image) => {
trace!(
"prefetch completed, remembering blk {} in rel {:?} in LFC",
req.block_number, rel
*blkno, rel
);
self.cache
.remember_page(&rel, req.block_number, page_image, not_modified_since)
.remember_page(&rel, *blkno, page_image, not_modified_since, false)
.await;
}
Err(err) => {

View File

@@ -10,3 +10,5 @@ mod logging;
mod main_loop;
mod metrics_exporter;
mod worker_interface;
mod in_progress_ios;