From e6a4171fa1ba1ced5ef372de4731116a6120e887 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 10 May 2025 02:36:48 +0300 Subject: [PATCH] fix concurrency issues with the LFC - Add another locking hash table to track which cached pages are currently being modified, by smgrwrite() or smgrread() or by prefetch. - Use single-value Leaf pages in the art tree. That seems simpler after all, and it eliminates some corner cases where a Value needed to be cloned, which made it tricky to use atomics or other interior mutability on the Values --- Cargo.lock | 1 + libs/neonart/src/algorithm.rs | 230 +++--- libs/neonart/src/algorithm/node_ptr.rs | 692 ++++-------------- libs/neonart/src/algorithm/node_ref.rs | 67 +- libs/neonart/src/allocator.rs | 44 +- libs/neonart/src/allocator/slab.rs | 1 - libs/neonart/src/lib.rs | 69 +- libs/neonart/src/tests.rs | 61 +- pgxn/neon/communicator/Cargo.toml | 1 + pgxn/neon/communicator/src/file_cache.rs | 13 +- .../neon/communicator/src/integrated_cache.rs | 265 +++++-- .../src/worker_process/in_progress_ios.rs | 81 ++ .../src/worker_process/main_loop.rs | 62 +- .../communicator/src/worker_process/mod.rs | 2 + 14 files changed, 728 insertions(+), 861 deletions(-) create mode 100644 pgxn/neon/communicator/src/worker_process/in_progress_ios.rs diff --git a/Cargo.lock b/Cargo.lock index 655542c12d..c881c68a97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1349,6 +1349,7 @@ dependencies = [ "axum 0.8.1", "bytes", "cbindgen", + "clashmap", "http 1.1.0", "libc", "metrics", diff --git a/libs/neonart/src/algorithm.rs b/libs/neonart/src/algorithm.rs index 573ef87c92..c0c4b19e93 100644 --- a/libs/neonart/src/algorithm.rs +++ b/libs/neonart/src/algorithm.rs @@ -6,12 +6,12 @@ use std::vec::Vec; use crate::algorithm::lock_and_version::ConcurrentUpdateError; use crate::algorithm::node_ptr::MAX_PREFIX_LEN; -use crate::algorithm::node_ref::ChildOrValue; use crate::algorithm::node_ref::{NewNodeRef, NodeRef, ReadLockedNodeRef, WriteLockedNodeRef}; use crate::allocator::OutOfMemoryError; use crate::GarbageQueueFullError; use crate::TreeWriteGuard; +use crate::UpdateAction; use crate::allocator::ArtAllocator; use crate::epoch::EpochPin; use crate::{Key, Value}; @@ -89,7 +89,7 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator, F>( root: RootPtr, guard: &'g mut TreeWriteGuard<'e, K, V, A>, ) where - F: FnOnce(Option<&V>) -> Option, + F: FnOnce(Option<&V>) -> UpdateAction, { let value_fn_cell = std::cell::Cell::new(Some(value_fn)); loop { @@ -108,7 +108,6 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator, F>( ) { Ok(()) => break, Err(ArtError::ConcurrentUpdate) => { - eprintln!("retrying"); continue; // retry } Err(ArtError::OutOfMemory) => { @@ -150,21 +149,25 @@ fn lookup_recurse<'e, V: Value>( rnode.read_unlock_or_restart()?; return Ok(None); }; + + if rnode.is_leaf() { + assert_eq!(key.len(), prefix_len); + let vptr = rnode.get_leaf_value_ptr()?; + // safety: It's OK to return a ref of the pointer because we checked the version + // and the lifetime of 'epoch_pin' enforces that the reference is only accessible + // as long as the epoch is pinned. + let v = unsafe { vptr.as_ref().unwrap() }; + return Ok(Some(v)); + } + let key = &key[prefix_len..]; // find child (or leaf value) - let next_node = rnode.find_child_or_value_or_restart(key[0])?; + let next_node = rnode.find_child_or_restart(key[0])?; match next_node { None => Ok(None), // key not found - Some(ChildOrValue::Value(vptr)) => { - // safety: It's OK to return a ref of the pointer because we checked the version - // and the lifetime of 'epoch_pin' enforces that the reference is only accessible - // as long as the epoch is pinned. - let v = unsafe { vptr.as_ref().unwrap() }; - Ok(Some(v)) - } - Some(ChildOrValue::Child(v)) => lookup_recurse(&key[1..], v, Some(rnode), epoch_pin), + Some(child) => lookup_recurse(&key[1..], child, Some(rnode), epoch_pin), } } @@ -179,23 +182,36 @@ fn next_recurse<'e, V: Value>( if prefix.len() != 0 { path.extend_from_slice(prefix); } - assert!(path.len() < min_key.len()); use std::cmp::Ordering; - let mut min_key_byte = match path.as_slice().cmp(&min_key[0..path.len()]) { - Ordering::Less => { - rnode.read_unlock_or_restart()?; - return Ok(None); - } + let comparison = path.as_slice().cmp(&min_key[0..path.len()]); + if comparison == Ordering::Less { + rnode.read_unlock_or_restart()?; + return Ok(None); + } + + if rnode.is_leaf() { + assert_eq!(path.len(), min_key.len()); + let vptr = rnode.get_leaf_value_ptr()?; + // safety: It's OK to return a ref of the pointer because we checked the version + // and the lifetime of 'epoch_pin' enforces that the reference is only accessible + // as long as the epoch is pinned. + let v = unsafe { vptr.as_ref().unwrap() }; + return Ok(Some(v)); + } + + let mut min_key_byte = match comparison { + Ordering::Less => unreachable!(), // checked this above already Ordering::Equal => min_key[path.len()], Ordering::Greater => 0, }; + loop { - match rnode.find_next_child_or_value_or_restart(min_key_byte)? { + match rnode.find_next_child_or_restart(min_key_byte)? { None => { return Ok(None); } - Some((key_byte, ChildOrValue::Child(child_ref))) => { + Some((key_byte, child_ref)) => { let path_len = path.len(); path.push(key_byte); let result = next_recurse(min_key, path, child_ref, epoch_pin)?; @@ -208,15 +224,6 @@ fn next_recurse<'e, V: Value>( path.truncate(path_len); min_key_byte = key_byte + 1; } - Some((key_byte, ChildOrValue::Value(vptr))) => { - path.push(key_byte); - assert_eq!(path.len(), min_key.len()); - // safety: It's OK to return a ref of the pointer because we checked the version - // and the lifetime of 'epoch_pin' enforces that the reference is only accessible - // as long as the epoch is pinned. - let v = unsafe { vptr.as_ref().unwrap() }; - return Ok(Some(v)); - } } } } @@ -232,7 +239,7 @@ pub(crate) fn update_recurse<'e, 'g, K: Key, V: Value, A: ArtAllocator, F>( orig_key: &[u8], ) -> Result<(), ArtError> where - F: FnOnce(Option<&V>) -> Option, + F: FnOnce(Option<&V>) -> UpdateAction, { let rnode = node.read_lock_or_restart()?; @@ -242,8 +249,14 @@ where let mut wparent = rparent.upgrade_to_write_lock_or_restart()?; let mut wnode = rnode.upgrade_to_write_lock_or_restart()?; - if let Some(new_value) = value_fn(None) { - insert_split_prefix(key, new_value, &mut wnode, &mut wparent, parent_key, guard)?; + match value_fn(None) { + UpdateAction::Nothing => {} + UpdateAction::Insert(new_value) => { + insert_split_prefix(key, new_value, &mut wnode, &mut wparent, parent_key, guard)?; + } + UpdateAction::Remove => { + panic!("unexpected Remove action on insertion"); + } } wnode.write_unlock(); wparent.write_unlock(); @@ -253,7 +266,34 @@ where let key = &key[prefix_match_len as usize..]; let level = level + prefix_match_len as usize; - let next_node = rnode.find_child_or_value_or_restart(key[0])?; + if rnode.is_leaf() { + assert_eq!(key.len(), 0); + let (rparent, parent_key) = rparent.expect("root cannot be leaf"); + let mut wparent = rparent.upgrade_to_write_lock_or_restart()?; + let mut wnode = rnode.upgrade_to_write_lock_or_restart()?; + + // safety: Now that we have acquired the write lock, we have exclusive access to the + // value. XXX: There might be concurrent reads though? + let value_mut = wnode.get_leaf_value_mut(); + + match value_fn(Some(value_mut)) { + UpdateAction::Nothing => {} + UpdateAction::Insert(_) => panic!("cannot insert over existing value"), + UpdateAction::Remove => { + // TODO: Shrink the node + // TODO: If the parent becomes empty, unlink it from grandparent + // TODO: If parent has only one child left, merge it with the child, extending its + // prefix + wparent.delete_child(parent_key); + } + } + wnode.write_unlock(); + wparent.write_unlock(); + + return Ok(()); + } + + let next_node = rnode.find_child_or_restart(key[0])?; if next_node.is_none() { if rnode.is_full() { @@ -261,63 +301,53 @@ where let mut wparent = rparent.upgrade_to_write_lock_or_restart()?; let wnode = rnode.upgrade_to_write_lock_or_restart()?; - if let Some(new_value) = value_fn(None) { - insert_and_grow(key, new_value, &wnode, &mut wparent, parent_key, guard)?; - wnode.write_unlock_obsolete(); - wparent.write_unlock(); - } else { - wnode.write_unlock(); - wparent.write_unlock(); - } + match value_fn(None) { + UpdateAction::Nothing => { + wnode.write_unlock(); + wparent.write_unlock(); + } + UpdateAction::Insert(new_value) => { + insert_and_grow(key, new_value, &wnode, &mut wparent, parent_key, guard)?; + wnode.write_unlock_obsolete(); + wparent.write_unlock(); + } + UpdateAction::Remove => { + panic!("unexpected Remove action on insertion"); + } + }; } else { let mut wnode = rnode.upgrade_to_write_lock_or_restart()?; if let Some((rparent, _)) = rparent { rparent.read_unlock_or_restart()?; } - if let Some(new_value) = value_fn(None) { - insert_to_node(&mut wnode, key, new_value, guard)?; - } + match value_fn(None) { + UpdateAction::Nothing => {} + UpdateAction::Insert(new_value) => { + insert_to_node(&mut wnode, key, new_value, guard)?; + } + UpdateAction::Remove => { + panic!("unexpected Remove action on insertion"); + } + }; wnode.write_unlock(); } return Ok(()); } else { - let next_node = next_node.unwrap(); // checked above it's not None + let next_child = next_node.unwrap(); // checked above it's not None if let Some((rparent, _)) = rparent { rparent.read_unlock_or_restart()?; } - match next_node { - ChildOrValue::Value(existing_value_ptr) => { - assert!(key.len() == 1); - let mut wnode = rnode.upgrade_to_write_lock_or_restart()?; - - // safety: Now that we have acquired the write lock, we have exclusive access to the - // value - let vmut = unsafe { existing_value_ptr.cast_mut().as_mut() }.unwrap(); - if let Some(new_value) = value_fn(Some(vmut)) { - *vmut = new_value; - } else { - // TODO: Shrink the node - // TODO: If the node becomes empty, unlink it from parent - wnode.delete_value(key[0]); - } - wnode.write_unlock(); - - Ok(()) - } - ChildOrValue::Child(next_child) => { - // recurse to next level - update_recurse( - &key[1..], - value_fn, - next_child, - Some((rnode, key[0])), - guard, - level + 1, - orig_key, - ) - } - } + // recurse to next level + update_recurse( + &key[1..], + value_fn, + next_child, + Some((rnode, key[0])), + guard, + level + 1, + orig_key, + ) } } @@ -351,10 +381,19 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>( path.push(PathElement::Prefix(Vec::from(prefix))); } + if rnode.is_leaf() { + let vptr = rnode.get_leaf_value_ptr()?; + // safety: It's OK to return a ref of the pointer because we checked the version + // and the lifetime of 'epoch_pin' enforces that the reference is only accessible + // as long as the epoch is pinned. + let val = unsafe { vptr.as_ref().unwrap() }; + eprintln!("{} {:?}: {:?}", indent, path, val); + } + for key_byte in 0..u8::MAX { - match rnode.find_child_or_value_or_restart(key_byte)? { + match rnode.find_child_or_restart(key_byte)? { None => continue, - Some(ChildOrValue::Child(child_ref)) => { + Some(child_ref) => { let rchild = child_ref.read_lock_or_restart()?; eprintln!( "{} {:?}, {}: prefix {:?}", @@ -369,11 +408,6 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>( dump_recurse(&child_path, child_ref, epoch_pin, level + 1)?; } - Some(ChildOrValue::Value(val)) => { - eprintln!("{} {:?}, {}: {:?}", indent, path, key_byte, unsafe { - val.as_ref().unwrap() - }); - } } } @@ -429,12 +463,8 @@ fn insert_to_node<'e, K: Key, V: Value, A: ArtAllocator>( value: V, guard: &'e TreeWriteGuard, ) -> Result<(), OutOfMemoryError> { - if wnode.is_leaf() { - wnode.insert_value(key[0], value); - } else { - let value_child = allocate_node_for_value(&key[1..], value, guard.tree_writer.allocator)?; - wnode.insert_child(key[0], value_child.into_ptr()); - } + let value_child = allocate_node_for_value(&key[1..], value, guard.tree_writer.allocator)?; + wnode.insert_child(key[0], value_child.into_ptr()); Ok(()) } @@ -448,13 +478,10 @@ fn insert_and_grow<'e, 'g, K: Key, V: Value, A: ArtAllocator>( guard: &'g mut TreeWriteGuard<'e, K, V, A>, ) -> Result<(), ArtError> { let mut bigger_node = wnode.grow(guard.tree_writer.allocator)?; - if wnode.is_leaf() { - bigger_node.insert_value(key[0], value); - } else { - // FIXME: deallocate 'bigger_node' on OOM - let value_child = allocate_node_for_value(&key[1..], value, guard.tree_writer.allocator)?; - bigger_node.insert_new_child(key[0], value_child); - } + + // FIXME: deallocate 'bigger_node' on OOM + let value_child = allocate_node_for_value(&key[1..], value, guard.tree_writer.allocator)?; + bigger_node.insert_new_child(key[0], value_child); // Replace the pointer in the parent parent.replace_child(parent_key_byte, bigger_node.into_ptr()); @@ -464,17 +491,16 @@ fn insert_and_grow<'e, 'g, K: Key, V: Value, A: ArtAllocator>( Ok(()) } -// Allocate a new leaf node to hold 'value'. If key is long, we may need to allocate -// new internal nodes to hold it too +// Allocate a new leaf node to hold 'value'. If the key is long, we +// may need to allocate new internal nodes to hold it too fn allocate_node_for_value<'a, V: Value, A: ArtAllocator>( key: &[u8], value: V, allocator: &'a A, ) -> Result, OutOfMemoryError> { - let mut prefix_off = key.len().saturating_sub(MAX_PREFIX_LEN + 1); + let mut prefix_off = key.len().saturating_sub(MAX_PREFIX_LEN); - let mut leaf_node = node_ref::new_leaf(&key[prefix_off..key.len() - 1], allocator)?; - leaf_node.insert_value(*key.last().unwrap(), value); + let leaf_node = node_ref::new_leaf(&key[prefix_off..key.len()], value, allocator)?; let mut node = leaf_node; while prefix_off > 0 { diff --git a/libs/neonart/src/algorithm/node_ptr.rs b/libs/neonart/src/algorithm/node_ptr.rs index 71e2c9f347..c616e8e1b0 100644 --- a/libs/neonart/src/algorithm/node_ptr.rs +++ b/libs/neonart/src/algorithm/node_ptr.rs @@ -13,10 +13,7 @@ enum NodeTag { Internal16, Internal48, Internal256, - Leaf4, - Leaf16, - Leaf48, - Leaf256, + Leaf, } #[repr(C)] @@ -31,6 +28,12 @@ pub(crate) struct NodePtr { phantom_value: PhantomData, } +impl PartialEq for NodePtr { + fn eq(&self, other: &NodePtr) -> bool { + self.ptr == other.ptr + } +} + impl std::fmt::Debug for NodePtr { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { write!(fmt, "0x{}", self.ptr.addr()) @@ -52,10 +55,7 @@ enum NodeVariant<'a, V> { Internal16(&'a NodeInternal16), Internal48(&'a NodeInternal48), Internal256(&'a NodeInternal256), - Leaf4(&'a NodeLeaf4), - Leaf16(&'a NodeLeaf16), - Leaf48(&'a NodeLeaf48), - Leaf256(&'a NodeLeaf256), + Leaf(&'a NodeLeaf), } enum NodeVariantMut<'a, V> { @@ -63,15 +63,7 @@ enum NodeVariantMut<'a, V> { Internal16(&'a mut NodeInternal16), Internal48(&'a mut NodeInternal48), Internal256(&'a mut NodeInternal256), - Leaf4(&'a mut NodeLeaf4), - Leaf16(&'a mut NodeLeaf16), - Leaf48(&'a mut NodeLeaf48), - Leaf256(&'a mut NodeLeaf256), -} - -pub(crate) enum ChildOrValuePtr { - Child(NodePtr), - Value(*const V), + Leaf(&'a mut NodeLeaf), } #[repr(C)] @@ -127,54 +119,14 @@ pub struct NodeInternal256 { } #[repr(C)] -pub struct NodeLeaf4 { +pub struct NodeLeaf { tag: NodeTag, lock_and_version: AtomicLockAndVersion, prefix: [u8; MAX_PREFIX_LEN], prefix_len: u8, - num_values: u8, - child_keys: [u8; 4], - child_values: [Option; 4], -} - -#[repr(C)] -pub struct NodeLeaf16 { - tag: NodeTag, - lock_and_version: AtomicLockAndVersion, - - prefix: [u8; MAX_PREFIX_LEN], - prefix_len: u8, - - num_values: u8, - child_keys: [u8; 16], - child_values: [Option; 16], -} - -#[repr(C)] -pub struct NodeLeaf48 { - tag: NodeTag, - lock_and_version: AtomicLockAndVersion, - - prefix: [u8; MAX_PREFIX_LEN], - prefix_len: u8, - - num_values: u8, - child_indexes: [u8; 256], - child_values: [Option; 48], -} - -#[repr(C)] -pub struct NodeLeaf256 { - tag: NodeTag, - lock_and_version: AtomicLockAndVersion, - - prefix: [u8; MAX_PREFIX_LEN], - prefix_len: u8, - - num_values: u16, - child_values: [Option; 256], + value: V, } impl NodePtr { @@ -184,10 +136,7 @@ impl NodePtr { NodeVariant::Internal16(_) => false, NodeVariant::Internal48(_) => false, NodeVariant::Internal256(_) => false, - NodeVariant::Leaf4(_) => true, - NodeVariant::Leaf16(_) => true, - NodeVariant::Leaf48(_) => true, - NodeVariant::Leaf256(_) => true, + NodeVariant::Leaf(_) => true, } } @@ -197,10 +146,7 @@ impl NodePtr { NodeVariant::Internal16(n) => &n.lock_and_version, NodeVariant::Internal48(n) => &n.lock_and_version, NodeVariant::Internal256(n) => &n.lock_and_version, - NodeVariant::Leaf4(n) => &n.lock_and_version, - NodeVariant::Leaf16(n) => &n.lock_and_version, - NodeVariant::Leaf48(n) => &n.lock_and_version, - NodeVariant::Leaf256(n) => &n.lock_and_version, + NodeVariant::Leaf(n) => &n.lock_and_version, } } @@ -230,17 +176,8 @@ impl NodePtr { NodeTag::Internal256 => NodeVariant::Internal256( NonNull::new_unchecked(self.ptr.cast::>()).as_ref(), ), - NodeTag::Leaf4 => NodeVariant::Leaf4( - NonNull::new_unchecked(self.ptr.cast::>()).as_ref(), - ), - NodeTag::Leaf16 => NodeVariant::Leaf16( - NonNull::new_unchecked(self.ptr.cast::>()).as_ref(), - ), - NodeTag::Leaf48 => NodeVariant::Leaf48( - NonNull::new_unchecked(self.ptr.cast::>()).as_ref(), - ), - NodeTag::Leaf256 => NodeVariant::Leaf256( - NonNull::new_unchecked(self.ptr.cast::>()).as_ref(), + NodeTag::Leaf => NodeVariant::Leaf( + NonNull::new_unchecked(self.ptr.cast::>()).as_ref(), ), } } @@ -261,17 +198,8 @@ impl NodePtr { NodeTag::Internal256 => NodeVariantMut::Internal256( NonNull::new_unchecked(self.ptr.cast::>()).as_mut(), ), - NodeTag::Leaf4 => NodeVariantMut::Leaf4( - NonNull::new_unchecked(self.ptr.cast::>()).as_mut(), - ), - NodeTag::Leaf16 => NodeVariantMut::Leaf16( - NonNull::new_unchecked(self.ptr.cast::>()).as_mut(), - ), - NodeTag::Leaf48 => NodeVariantMut::Leaf48( - NonNull::new_unchecked(self.ptr.cast::>()).as_mut(), - ), - NodeTag::Leaf256 => NodeVariantMut::Leaf256( - NonNull::new_unchecked(self.ptr.cast::>()).as_mut(), + NodeTag::Leaf => NodeVariantMut::Leaf( + NonNull::new_unchecked(self.ptr.cast::>()).as_mut(), ), } } @@ -295,10 +223,7 @@ impl NodePtr { NodeVariant::Internal16(n) => n.get_prefix(), NodeVariant::Internal48(n) => n.get_prefix(), NodeVariant::Internal256(n) => n.get_prefix(), - NodeVariant::Leaf4(n) => n.get_prefix(), - NodeVariant::Leaf16(n) => n.get_prefix(), - NodeVariant::Leaf48(n) => n.get_prefix(), - NodeVariant::Leaf256(n) => n.get_prefix(), + NodeVariant::Leaf(n) => n.get_prefix(), } } @@ -308,65 +233,27 @@ impl NodePtr { NodeVariant::Internal16(n) => n.is_full(), NodeVariant::Internal48(n) => n.is_full(), NodeVariant::Internal256(n) => n.is_full(), - NodeVariant::Leaf4(n) => n.is_full(), - NodeVariant::Leaf16(n) => n.is_full(), - NodeVariant::Leaf48(n) => n.is_full(), - NodeVariant::Leaf256(n) => n.is_full(), + NodeVariant::Leaf(_) => panic!("is_full() called on leaf node"), } } - pub(crate) fn find_child_or_value(&self, key_byte: u8) -> Option> { + pub(crate) fn find_child(&self, key_byte: u8) -> Option> { match self.variant() { - NodeVariant::Internal4(n) => n.find_child(key_byte).map(|c| ChildOrValuePtr::Child(c)), - NodeVariant::Internal16(n) => n.find_child(key_byte).map(|c| ChildOrValuePtr::Child(c)), - NodeVariant::Internal48(n) => n.find_child(key_byte).map(|c| ChildOrValuePtr::Child(c)), - NodeVariant::Internal256(n) => { - n.find_child(key_byte).map(|c| ChildOrValuePtr::Child(c)) - } - NodeVariant::Leaf4(n) => n - .get_leaf_value(key_byte) - .map(|v| ChildOrValuePtr::Value(v)), - NodeVariant::Leaf16(n) => n - .get_leaf_value(key_byte) - .map(|v| ChildOrValuePtr::Value(v)), - NodeVariant::Leaf48(n) => n - .get_leaf_value(key_byte) - .map(|v| ChildOrValuePtr::Value(v)), - NodeVariant::Leaf256(n) => n - .get_leaf_value(key_byte) - .map(|v| ChildOrValuePtr::Value(v)), + NodeVariant::Internal4(n) => n.find_child(key_byte), + NodeVariant::Internal16(n) => n.find_child(key_byte), + NodeVariant::Internal48(n) => n.find_child(key_byte), + NodeVariant::Internal256(n) => n.find_child(key_byte), + NodeVariant::Leaf(_) => panic!("find_child called on leaf node"), } } - pub(crate) fn find_next_child_or_value( - &self, - key_byte: u8, - ) -> Option<(u8, ChildOrValuePtr)> { + pub(crate) fn find_next_child(&self, key_byte: u8) -> Option<(u8, NodePtr)> { match self.variant() { - NodeVariant::Internal4(n) => n - .find_next_child(key_byte) - .map(|(k, c)| (k, ChildOrValuePtr::Child(c))), - NodeVariant::Internal16(n) => n - .find_next_child(key_byte) - .map(|(k, c)| (k, ChildOrValuePtr::Child(c))), - NodeVariant::Internal48(n) => n - .find_next_child(key_byte) - .map(|(k, c)| (k, ChildOrValuePtr::Child(c))), - NodeVariant::Internal256(n) => n - .find_next_child(key_byte) - .map(|(k, c)| (k, ChildOrValuePtr::Child(c))), - NodeVariant::Leaf4(n) => n - .find_next_leaf_value(key_byte) - .map(|(k, v)| (k, ChildOrValuePtr::Value(v))), - NodeVariant::Leaf16(n) => n - .find_next_leaf_value(key_byte) - .map(|(k, v)| (k, ChildOrValuePtr::Value(v))), - NodeVariant::Leaf48(n) => n - .find_next_leaf_value(key_byte) - .map(|(k, v)| (k, ChildOrValuePtr::Value(v))), - NodeVariant::Leaf256(n) => n - .find_next_leaf_value(key_byte) - .map(|(k, v)| (k, ChildOrValuePtr::Value(v))), + NodeVariant::Internal4(n) => n.find_next_child(key_byte), + NodeVariant::Internal16(n) => n.find_next_child(key_byte), + NodeVariant::Internal48(n) => n.find_next_child(key_byte), + NodeVariant::Internal256(n) => n.find_next_child(key_byte), + NodeVariant::Leaf(_) => panic!("find_next_child called on leaf node"), } } @@ -376,10 +263,7 @@ impl NodePtr { NodeVariantMut::Internal16(n) => n.truncate_prefix(new_prefix_len), NodeVariantMut::Internal48(n) => n.truncate_prefix(new_prefix_len), NodeVariantMut::Internal256(n) => n.truncate_prefix(new_prefix_len), - NodeVariantMut::Leaf4(n) => n.truncate_prefix(new_prefix_len), - NodeVariantMut::Leaf16(n) => n.truncate_prefix(new_prefix_len), - NodeVariantMut::Leaf48(n) => n.truncate_prefix(new_prefix_len), - NodeVariantMut::Leaf256(n) => n.truncate_prefix(new_prefix_len), + NodeVariantMut::Leaf(n) => n.truncate_prefix(new_prefix_len), } } @@ -389,10 +273,7 @@ impl NodePtr { NodeVariant::Internal16(n) => n.grow(allocator), NodeVariant::Internal48(n) => n.grow(allocator), NodeVariant::Internal256(_) => panic!("cannot grow Internal256 node"), - NodeVariant::Leaf4(n) => n.grow(allocator), - NodeVariant::Leaf16(n) => n.grow(allocator), - NodeVariant::Leaf48(n) => n.grow(allocator), - NodeVariant::Leaf256(_) => panic!("cannot grow Leaf256 node"), + NodeVariant::Leaf(_) => panic!("cannot grow Leaf node"), } } @@ -402,10 +283,7 @@ impl NodePtr { NodeVariantMut::Internal16(n) => n.insert_child(key_byte, child), NodeVariantMut::Internal48(n) => n.insert_child(key_byte, child), NodeVariantMut::Internal256(n) => n.insert_child(key_byte, child), - NodeVariantMut::Leaf4(_) - | NodeVariantMut::Leaf16(_) - | NodeVariantMut::Leaf48(_) - | NodeVariantMut::Leaf256(_) => panic!("insert_child called on leaf node"), + NodeVariantMut::Leaf(_) => panic!("insert_child called on leaf node"), } } @@ -415,36 +293,37 @@ impl NodePtr { NodeVariantMut::Internal16(n) => n.replace_child(key_byte, replacement), NodeVariantMut::Internal48(n) => n.replace_child(key_byte, replacement), NodeVariantMut::Internal256(n) => n.replace_child(key_byte, replacement), - NodeVariantMut::Leaf4(_) - | NodeVariantMut::Leaf16(_) - | NodeVariantMut::Leaf48(_) - | NodeVariantMut::Leaf256(_) => panic!("replace_child called on leaf node"), + NodeVariantMut::Leaf(_) => panic!("replace_child called on leaf node"), } } - pub(crate) fn insert_value(&mut self, key_byte: u8, value: V) { + pub(crate) fn delete_child(&mut self, key_byte: u8) { + match self.variant_mut() { + NodeVariantMut::Internal4(n) => n.delete_child(key_byte), + NodeVariantMut::Internal16(n) => n.delete_child(key_byte), + NodeVariantMut::Internal48(n) => n.delete_child(key_byte), + NodeVariantMut::Internal256(n) => n.delete_child(key_byte), + NodeVariantMut::Leaf(_) => panic!("delete_child called on leaf node"), + } + } + + pub(crate) fn get_leaf_value(&self) -> &V { + match self.variant() { + NodeVariant::Internal4(_) + | NodeVariant::Internal16(_) + | NodeVariant::Internal48(_) + | NodeVariant::Internal256(_) => panic!("get_leaf_value called on internal node"), + NodeVariant::Leaf(n) => n.get_leaf_value(), + } + } + + pub(crate) fn get_leaf_value_mut(&mut self) -> &mut V { match self.variant_mut() { NodeVariantMut::Internal4(_) | NodeVariantMut::Internal16(_) | NodeVariantMut::Internal48(_) - | NodeVariantMut::Internal256(_) => panic!("insert_value called on internal node"), - NodeVariantMut::Leaf4(n) => n.insert_value(key_byte, value), - NodeVariantMut::Leaf16(n) => n.insert_value(key_byte, value), - NodeVariantMut::Leaf48(n) => n.insert_value(key_byte, value), - NodeVariantMut::Leaf256(n) => n.insert_value(key_byte, value), - } - } - - pub(crate) fn delete_value(&mut self, key_byte: u8) { - match self.variant_mut() { - NodeVariantMut::Internal4(_) - | NodeVariantMut::Internal16(_) - | NodeVariantMut::Internal48(_) - | NodeVariantMut::Internal256(_) => panic!("delete_value called on internal node"), - NodeVariantMut::Leaf4(n) => n.delete_value(key_byte), - NodeVariantMut::Leaf16(n) => n.delete_value(key_byte), - NodeVariantMut::Leaf48(n) => n.delete_value(key_byte), - NodeVariantMut::Leaf256(n) => n.delete_value(key_byte), + | NodeVariantMut::Internal256(_) => panic!("get_leaf_value called on internal node"), + NodeVariantMut::Leaf(n) => n.get_leaf_value_mut(), } } @@ -454,10 +333,7 @@ impl NodePtr { NodeVariant::Internal16(_) => allocator.dealloc_node_internal16(self.ptr.cast()), NodeVariant::Internal48(_) => allocator.dealloc_node_internal48(self.ptr.cast()), NodeVariant::Internal256(_) => allocator.dealloc_node_internal256(self.ptr.cast()), - NodeVariant::Leaf4(_) => allocator.dealloc_node_leaf4(self.ptr.cast()), - NodeVariant::Leaf16(_) => allocator.dealloc_node_leaf16(self.ptr.cast()), - NodeVariant::Leaf48(_) => allocator.dealloc_node_leaf48(self.ptr.cast()), - NodeVariant::Leaf256(_) => allocator.dealloc_node_leaf256(self.ptr.cast()), + NodeVariant::Leaf(_) => allocator.dealloc_node_leaf(self.ptr.cast()), } } } @@ -497,21 +373,19 @@ pub fn new_internal(prefix: &[u8], allocator: &impl ArtAllocator) - ptr.into() } -pub fn new_leaf(prefix: &[u8], allocator: &impl ArtAllocator) -> NodePtr { - let ptr: *mut NodeLeaf4 = allocator.alloc_node_leaf4().cast(); +pub fn new_leaf(prefix: &[u8], value: V, allocator: &impl ArtAllocator) -> NodePtr { + let ptr: *mut NodeLeaf = allocator.alloc_node_leaf().cast(); if ptr.is_null() { panic!("out of memory"); } - let mut init = NodeLeaf4 { - tag: NodeTag::Leaf4, + let mut init = NodeLeaf { + tag: NodeTag::Leaf, lock_and_version: AtomicLockAndVersion::new(), prefix: [8; MAX_PREFIX_LEN], prefix_len: prefix.len() as u8, - num_values: 0, - child_keys: [0; 4], - child_values: [const { None }; 4], + value, }; init.prefix[0..prefix.len()].copy_from_slice(prefix); unsafe { ptr.write(init) }; @@ -574,6 +448,20 @@ impl NodeInternal4 { panic!("could not re-find parent with key {}", key_byte); } + fn delete_child(&mut self, key_byte: u8) { + for i in 0..self.num_children as usize { + if self.child_keys[i] == key_byte { + self.num_children -= 1; + for j in i..self.num_children as usize { + self.child_keys[j] = self.child_keys[j + 1]; + self.child_ptrs[j] = self.child_ptrs[j + 1]; + } + return; + } + } + panic!("could not re-find parent with key {}", key_byte); + } + fn is_full(&self) -> bool { self.num_children == 4 } @@ -667,6 +555,20 @@ impl NodeInternal16 { panic!("could not re-find parent with key {}", key_byte); } + fn delete_child(&mut self, key_byte: u8) { + for i in 0..self.num_children as usize { + if self.child_keys[i] == key_byte { + self.num_children -= 1; + for j in i..self.num_children as usize { + self.child_keys[j] = self.child_keys[j + 1]; + self.child_ptrs[j] = self.child_ptrs[j + 1]; + } + return; + } + } + panic!("could not re-find parent with key {}", key_byte); + } + fn is_full(&self) -> bool { self.num_children == 16 } @@ -742,11 +644,32 @@ impl NodeInternal48 { fn replace_child(&mut self, key_byte: u8, replacement: NodePtr) { let idx = self.child_indexes[key_byte as usize]; - if idx != INVALID_CHILD_INDEX { - self.child_ptrs[idx as usize] = replacement - } else { + if idx == INVALID_CHILD_INDEX { panic!("could not re-find parent with key {}", key_byte); } + self.child_ptrs[idx as usize] = replacement + } + + fn delete_child(&mut self, key_byte: u8) { + let idx = self.child_indexes[key_byte as usize] as usize; + if idx == INVALID_CHILD_INDEX as usize { + panic!("could not re-find parent with key {}", key_byte); + } + self.child_indexes[key_byte as usize] = INVALID_CHILD_INDEX; + self.num_children -= 1; + + // Compact the child_ptrs array + let removed_idx = self.num_children as usize; + if idx != removed_idx { + for i in 0..u8::MAX as usize { + if self.child_indexes[i] as usize == removed_idx { + self.child_indexes[i] = idx as u8; + self.child_ptrs[idx] = self.child_ptrs[removed_idx]; + return; + } + } + panic!("could not re-find last index on Internal48 node"); + } } fn is_full(&self) -> bool { @@ -830,6 +753,15 @@ impl NodeInternal256 { } } + fn delete_child(&mut self, key_byte: u8) { + let idx = key_byte as usize; + if self.child_ptrs[idx].is_null() { + panic!("could not re-find parent with key {}", key_byte); + } + self.num_children -= 1; + self.child_ptrs[idx] = NodePtr::null(); + } + fn is_full(&self) -> bool { self.num_children == 256 } @@ -842,7 +774,7 @@ impl NodeInternal256 { } } -impl NodeLeaf4 { +impl NodeLeaf { fn get_prefix(&self) -> &[u8] { &self.prefix[0..self.prefix_len as usize] } @@ -857,346 +789,12 @@ impl NodeLeaf4 { self.prefix_len = new_prefix_len as u8; } - fn get_leaf_value<'a: 'b, 'b>(&'a self, key: u8) -> Option<&'b V> { - for i in 0..self.num_values { - if self.child_keys[i as usize] == key { - assert!(self.child_values[i as usize].is_some()); - return self.child_values[i as usize].as_ref(); - } - } - None + fn get_leaf_value<'a: 'b, 'b>(&'a self) -> &'b V { + &self.value } - fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> { - let mut found: Option<(usize, u8)> = None; - for i in 0..self.num_values as usize { - let this_key = self.child_keys[i]; - if this_key >= min_key { - if let Some((_, found_key)) = found { - if this_key < found_key { - found = Some((i, this_key)); - } - } else { - found = Some((i, this_key)); - } - } - } - if let Some((found_idx, found_key)) = found { - Some((found_key, self.child_values[found_idx].as_ref().unwrap())) - } else { - None - } - } - - fn is_full(&self) -> bool { - self.num_values == 4 - } - - fn insert_value(&mut self, key_byte: u8, value: V) { - assert!(self.num_values < 4); - - let idx = self.num_values as usize; - self.child_keys[idx] = key_byte; - self.child_values[idx] = Some(value); - self.num_values += 1; - } - - fn grow(&self, allocator: &impl ArtAllocator) -> NodePtr { - let ptr: *mut NodeLeaf16 = allocator.alloc_node_leaf16(); - if ptr.is_null() { - panic!("out of memory"); - } - let mut init = NodeLeaf16 { - tag: NodeTag::Leaf16, - lock_and_version: AtomicLockAndVersion::new(), - - prefix: self.prefix.clone(), - prefix_len: self.prefix_len, - num_values: self.num_values, - - child_keys: [0; 16], - child_values: [const { None }; 16], - }; - for i in 0..self.num_values as usize { - init.child_keys[i] = self.child_keys[i]; - init.child_values[i] = self.child_values[i].clone(); - } - unsafe { ptr.write(init) }; - ptr.into() - } - - fn delete_value(&mut self, key_byte: u8) { - assert!(self.num_values <= 4); - - for i in 0..self.num_values as usize { - if self.child_keys[i] == key_byte { - assert!(self.child_values[i].is_some()); - if i < self.num_values as usize - 1 { - self.child_keys[i] = self.child_keys[self.num_values as usize - 1]; - self.child_values[i] = std::mem::replace( - &mut self.child_values[self.num_values as usize - 1], - None, - ); - } - self.num_values -= 1; - return; - } - } - panic!("key to delete not found in leaf4 node"); - } -} - -impl NodeLeaf16 { - fn get_prefix(&self) -> &[u8] { - &self.prefix[0..self.prefix_len as usize] - } - - fn truncate_prefix(&mut self, new_prefix_len: usize) { - assert!(new_prefix_len < self.prefix_len as usize); - let prefix = &mut self.prefix; - let offset = self.prefix_len as usize - new_prefix_len; - for i in 0..new_prefix_len { - prefix[i] = prefix[i + offset]; - } - self.prefix_len = new_prefix_len as u8; - } - - fn get_leaf_value(&self, key: u8) -> Option<&V> { - for i in 0..self.num_values { - if self.child_keys[i as usize] == key { - assert!(self.child_values[i as usize].is_some()); - return self.child_values[i as usize].as_ref(); - } - } - None - } - - fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> { - let mut found: Option<(usize, u8)> = None; - for i in 0..self.num_values as usize { - let this_key = self.child_keys[i]; - if this_key >= min_key { - if let Some((_, found_key)) = found { - if this_key < found_key { - found = Some((i, this_key)); - } - } else { - found = Some((i, this_key)); - } - } - } - if let Some((found_idx, found_key)) = found { - Some((found_key, self.child_values[found_idx].as_ref().unwrap())) - } else { - None - } - } - - fn is_full(&self) -> bool { - self.num_values == 16 - } - - fn insert_value(&mut self, key_byte: u8, value: V) { - assert!(self.num_values < 16); - - let idx = self.num_values as usize; - self.child_keys[idx] = key_byte; - self.child_values[idx] = Some(value); - self.num_values += 1; - } - fn grow(&self, allocator: &impl ArtAllocator) -> NodePtr { - let ptr: *mut NodeLeaf48 = allocator.alloc_node_leaf48().cast(); - if ptr.is_null() { - panic!("out of memory"); - } - let mut init = NodeLeaf48 { - tag: NodeTag::Leaf48, - lock_and_version: AtomicLockAndVersion::new(), - - prefix: self.prefix.clone(), - prefix_len: self.prefix_len, - num_values: self.num_values, - - child_indexes: [INVALID_CHILD_INDEX; 256], - child_values: [const { None }; 48], - }; - for i in 0..self.num_values { - let idx = self.child_keys[i as usize]; - init.child_indexes[idx as usize] = i; - init.child_values[i as usize] = self.child_values[i as usize].clone(); - } - unsafe { ptr.write(init) }; - ptr.into() - } - - fn delete_value(&mut self, key_byte: u8) { - assert!(self.num_values <= 16); - - for i in 0..self.num_values as usize { - if self.child_keys[i as usize] == key_byte { - assert!(self.child_values[i as usize].is_some()); - if i < self.num_values as usize - 1 { - self.child_keys[i] = self.child_keys[self.num_values as usize - 1]; - self.child_values[i] = std::mem::replace( - &mut self.child_values[self.num_values as usize - 1], - None, - ); - } - self.num_values -= 1; - return; - } - } - panic!("key to delete not found in leaf16 node"); - } -} - -impl NodeLeaf48 { - fn get_prefix(&self) -> &[u8] { - &self.prefix[0..self.prefix_len as usize] - } - - fn truncate_prefix(&mut self, new_prefix_len: usize) { - assert!(new_prefix_len < self.prefix_len as usize); - let prefix = &mut self.prefix; - let offset = self.prefix_len as usize - new_prefix_len; - for i in 0..new_prefix_len { - prefix[i] = prefix[i + offset]; - } - self.prefix_len = new_prefix_len as u8; - } - - fn get_leaf_value(&self, key: u8) -> Option<&V> { - let idx = self.child_indexes[key as usize]; - if idx != INVALID_CHILD_INDEX { - assert!(self.child_values[idx as usize].is_some()); - self.child_values[idx as usize].as_ref() - } else { - None - } - } - - fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> { - for key in min_key..=u8::MAX { - let idx = self.child_indexes[key as usize]; - if idx != INVALID_CHILD_INDEX { - return Some((key, &self.child_values[idx as usize].as_ref().unwrap())); - } - } - None - } - - fn is_full(&self) -> bool { - self.num_values == 48 - } - - fn insert_value(&mut self, key_byte: u8, value: V) { - assert!(self.num_values < 48); - assert!(self.child_indexes[key_byte as usize] == INVALID_CHILD_INDEX); - let idx = self.num_values; - self.child_indexes[key_byte as usize] = idx; - self.child_values[idx as usize] = Some(value); - self.num_values += 1; - } - fn grow(&self, allocator: &impl ArtAllocator) -> NodePtr { - let ptr: *mut NodeLeaf256 = allocator.alloc_node_leaf256(); - if ptr.is_null() { - panic!("out of memory"); - } - let mut init = NodeLeaf256 { - tag: NodeTag::Leaf256, - lock_and_version: AtomicLockAndVersion::new(), - - prefix: self.prefix.clone(), - prefix_len: self.prefix_len, - num_values: self.num_values as u16, - - child_values: [const { None }; 256], - }; - for i in 0..256 { - let idx = self.child_indexes[i]; - if idx != INVALID_CHILD_INDEX { - init.child_values[i] = self.child_values[idx as usize].clone(); - } - } - unsafe { ptr.write(init) }; - ptr.into() - } - - fn delete_value(&mut self, key_byte: u8) { - assert!(self.num_values <= 48); - - let idx = self.child_indexes[key_byte as usize]; - if idx == INVALID_CHILD_INDEX { - panic!("key to delete not found in leaf48 node"); - } - self.child_indexes[key_byte as usize] = INVALID_CHILD_INDEX; - self.num_values -= 1; - - if idx < self.num_values { - // Move all existing values with higher indexes down one position - for i in idx as usize..self.num_values as usize { - self.child_values[i] = std::mem::replace(&mut self.child_values[i + 1], None); - } - - // Update all higher indexes - for i in 0..256 { - if self.child_indexes[i] != INVALID_CHILD_INDEX { - if self.child_indexes[i] > idx { - self.child_indexes[i] -= 1; - } - assert!(self.child_indexes[i] < self.num_values); - } - } - } - } -} - -impl NodeLeaf256 { - fn get_prefix(&self) -> &[u8] { - &self.prefix[0..self.prefix_len as usize] - } - - fn truncate_prefix(&mut self, new_prefix_len: usize) { - assert!(new_prefix_len < self.prefix_len as usize); - let prefix = &mut self.prefix; - let offset = self.prefix_len as usize - new_prefix_len; - for i in 0..new_prefix_len { - prefix[i] = prefix[i + offset]; - } - self.prefix_len = new_prefix_len as u8; - } - - fn get_leaf_value(&self, key: u8) -> Option<&V> { - let idx = key as usize; - self.child_values[idx].as_ref() - } - - fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> { - for key in min_key..=u8::MAX { - if let Some(v) = &self.child_values[key as usize] { - return Some((key, v)); - } - } - None - } - - fn is_full(&self) -> bool { - self.num_values == 256 - } - - fn insert_value(&mut self, key_byte: u8, value: V) { - assert!(self.num_values < 256); - assert!(self.child_values[key_byte as usize].is_none()); - self.child_values[key_byte as usize] = Some(value); - self.num_values += 1; - } - - fn delete_value(&mut self, key_byte: u8) { - if self.child_values[key_byte as usize].is_none() { - panic!("key to delete not found in leaf256 node"); - } - self.child_values[key_byte as usize] = None; - self.num_values -= 1; + fn get_leaf_value_mut<'a: 'b, 'b>(&'a mut self) -> &'b mut V { + &mut self.value } } @@ -1250,34 +848,8 @@ impl From<*mut NodeInternal256> for NodePtr { } } -impl From<*mut NodeLeaf4> for NodePtr { - fn from(val: *mut NodeLeaf4) -> NodePtr { - NodePtr { - ptr: val.cast(), - phantom_value: PhantomData, - } - } -} -impl From<*mut NodeLeaf16> for NodePtr { - fn from(val: *mut NodeLeaf16) -> NodePtr { - NodePtr { - ptr: val.cast(), - phantom_value: PhantomData, - } - } -} - -impl From<*mut NodeLeaf48> for NodePtr { - fn from(val: *mut NodeLeaf48) -> NodePtr { - NodePtr { - ptr: val.cast(), - phantom_value: PhantomData, - } - } -} - -impl From<*mut NodeLeaf256> for NodePtr { - fn from(val: *mut NodeLeaf256) -> NodePtr { +impl From<*mut NodeLeaf> for NodePtr { + fn from(val: *mut NodeLeaf) -> NodePtr { NodePtr { ptr: val.cast(), phantom_value: PhantomData, diff --git a/libs/neonart/src/algorithm/node_ref.rs b/libs/neonart/src/algorithm/node_ref.rs index 12ab0e40db..e804ed8b24 100644 --- a/libs/neonart/src/algorithm/node_ref.rs +++ b/libs/neonart/src/algorithm/node_ref.rs @@ -2,7 +2,6 @@ use std::fmt::Debug; use std::marker::PhantomData; use super::node_ptr; -use super::node_ptr::ChildOrValuePtr; use super::node_ptr::NodePtr; use crate::EpochPin; use crate::Value; @@ -56,12 +55,11 @@ pub struct ReadLockedNodeRef<'e, V> { phantom: PhantomData<&'e EpochPin<'e>>, } -pub(crate) enum ChildOrValue<'e, V> { - Child(NodeRef<'e, V>), - Value(*const V), -} - impl<'e, V: Value> ReadLockedNodeRef<'e, V> { + pub(crate) fn is_leaf(&self) -> bool { + self.ptr.is_leaf() + } + pub(crate) fn is_full(&self) -> bool { self.ptr.is_full() } @@ -78,43 +76,51 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> { self.ptr.prefix_matches(key) } - pub(crate) fn find_child_or_value_or_restart( + pub(crate) fn find_child_or_restart( &self, key_byte: u8, - ) -> Result>, ConcurrentUpdateError> { - let child_or_value = self.ptr.find_child_or_value(key_byte); + ) -> Result>, ConcurrentUpdateError> { + let child_or_value = self.ptr.find_child(key_byte); self.ptr.lockword().check_or_restart(self.version)?; match child_or_value { None => Ok(None), - Some(ChildOrValuePtr::Value(vptr)) => Ok(Some(ChildOrValue::Value(vptr))), - Some(ChildOrValuePtr::Child(child_ptr)) => Ok(Some(ChildOrValue::Child(NodeRef { + Some(child_ptr) => Ok(Some(NodeRef { ptr: child_ptr, phantom: self.phantom, - }))), + })), } } - pub(crate) fn find_next_child_or_value_or_restart( + pub(crate) fn find_next_child_or_restart( &self, min_key_byte: u8, - ) -> Result)>, ConcurrentUpdateError> { - let child_or_value = self.ptr.find_next_child_or_value(min_key_byte); + ) -> Result)>, ConcurrentUpdateError> { + let child_or_value = self.ptr.find_next_child(min_key_byte); self.ptr.lockword().check_or_restart(self.version)?; match child_or_value { None => Ok(None), - Some((k, ChildOrValuePtr::Value(vptr))) => Ok(Some((k, ChildOrValue::Value(vptr)))), - Some((k, ChildOrValuePtr::Child(child_ptr))) => Ok(Some(( + Some((k, child_ptr)) => Ok(Some(( k, - ChildOrValue::Child(NodeRef { + NodeRef { ptr: child_ptr, phantom: self.phantom, - }), + }, ))), } } + pub(crate) fn get_leaf_value_ptr(&self) -> Result<*const V, ConcurrentUpdateError> { + let result = self.ptr.get_leaf_value(); + self.ptr.lockword().check_or_restart(self.version)?; + + // Extend the lifetime. + let result = std::ptr::from_ref(result); + + Ok(result) + } + pub(crate) fn upgrade_to_write_lock_or_restart( self, ) -> Result, ConcurrentUpdateError> { @@ -142,10 +148,6 @@ pub struct WriteLockedNodeRef<'e, V> { } impl<'e, V: Value> WriteLockedNodeRef<'e, V> { - pub(crate) fn is_leaf(&self) -> bool { - self.ptr.is_leaf() - } - pub(crate) fn write_unlock(mut self) { self.ptr.lockword().write_unlock(); self.ptr = NodePtr::null(); @@ -168,12 +170,8 @@ impl<'e, V: Value> WriteLockedNodeRef<'e, V> { self.ptr.insert_child(key_byte, child) } - pub(crate) fn insert_value(&mut self, key_byte: u8, value: V) { - self.ptr.insert_value(key_byte, value) - } - - pub(crate) fn delete_value(&mut self, key_byte: u8) { - self.ptr.delete_value(key_byte) + pub(crate) fn get_leaf_value_mut(&mut self) -> &mut V { + self.ptr.get_leaf_value_mut() } pub(crate) fn grow<'a, A>( @@ -199,6 +197,10 @@ impl<'e, V: Value> WriteLockedNodeRef<'e, V> { pub(crate) fn replace_child(&mut self, key_byte: u8, replacement: NodePtr) { self.ptr.replace_child(key_byte, replacement); } + + pub(crate) fn delete_child(&mut self, key_byte: u8) { + self.ptr.delete_child(key_byte); + } } impl<'e, V> Drop for WriteLockedNodeRef<'e, V> { @@ -229,10 +231,6 @@ where self.ptr.insert_child(key_byte, child.as_ptr()) } - pub(crate) fn insert_value(&mut self, key_byte: u8, value: V) { - self.ptr.insert_value(key_byte, value) - } - pub(crate) fn into_ptr(mut self) -> NodePtr { let ptr = self.ptr; self.ptr = NodePtr::null(); @@ -279,6 +277,7 @@ where pub(crate) fn new_leaf<'a, V, A>( prefix: &[u8], + value: V, allocator: &'a A, ) -> Result, OutOfMemoryError> where @@ -286,7 +285,7 @@ where A: ArtAllocator, { Ok(NewNodeRef { - ptr: node_ptr::new_leaf(prefix, allocator), + ptr: node_ptr::new_leaf(prefix, value, allocator), allocator, extra_nodes: Vec::new(), }) diff --git a/libs/neonart/src/allocator.rs b/libs/neonart/src/allocator.rs index 860d024269..fef89da4a2 100644 --- a/libs/neonart/src/allocator.rs +++ b/libs/neonart/src/allocator.rs @@ -15,8 +15,7 @@ use spin; use crate::ArtTreeStatistics; use crate::Tree; pub use crate::algorithm::node_ptr::{ - NodeInternal4, NodeInternal16, NodeInternal48, NodeInternal256, NodeLeaf4, NodeLeaf16, - NodeLeaf48, NodeLeaf256, + NodeInternal4, NodeInternal16, NodeInternal48, NodeInternal256, NodeLeaf, }; pub struct OutOfMemoryError(); @@ -28,19 +27,13 @@ pub trait ArtAllocator { fn alloc_node_internal16(&self) -> *mut NodeInternal16; fn alloc_node_internal48(&self) -> *mut NodeInternal48; fn alloc_node_internal256(&self) -> *mut NodeInternal256; - fn alloc_node_leaf4(&self) -> *mut NodeLeaf4; - fn alloc_node_leaf16(&self) -> *mut NodeLeaf16; - fn alloc_node_leaf48(&self) -> *mut NodeLeaf48; - fn alloc_node_leaf256(&self) -> *mut NodeLeaf256; + fn alloc_node_leaf(&self) -> *mut NodeLeaf; fn dealloc_node_internal4(&self, ptr: *mut NodeInternal4); fn dealloc_node_internal16(&self, ptr: *mut NodeInternal16); fn dealloc_node_internal48(&self, ptr: *mut NodeInternal48); fn dealloc_node_internal256(&self, ptr: *mut NodeInternal256); - fn dealloc_node_leaf4(&self, ptr: *mut NodeLeaf4); - fn dealloc_node_leaf16(&self, ptr: *mut NodeLeaf16); - fn dealloc_node_leaf48(&self, ptr: *mut NodeLeaf48); - fn dealloc_node_leaf256(&self, ptr: *mut NodeLeaf256); + fn dealloc_node_leaf(&self, ptr: *mut NodeLeaf); } pub struct ArtMultiSlabAllocator<'t, V> @@ -49,21 +42,18 @@ where { tree_area: spin::Mutex>>>, - inner: MultiSlabAllocator<'t, 8>, + inner: MultiSlabAllocator<'t, 5>, phantom_val: PhantomData, } impl<'t, V: crate::Value> ArtMultiSlabAllocator<'t, V> { - const LAYOUTS: [Layout; 8] = [ + const LAYOUTS: [Layout; 5] = [ Layout::new::>(), Layout::new::>(), Layout::new::>(), Layout::new::>(), - Layout::new::>(), - Layout::new::>(), - Layout::new::>(), - Layout::new::>(), + Layout::new::>(), ]; pub fn new(area: &'t mut [MaybeUninit]) -> &'t mut ArtMultiSlabAllocator<'t, V> { @@ -101,18 +91,9 @@ impl<'t, V: crate::Value> ArtAllocator for ArtMultiSlabAllocator<'t, V> { fn alloc_node_internal256(&self) -> *mut NodeInternal256 { self.inner.alloc_slab(3).cast() } - fn alloc_node_leaf4(&self) -> *mut NodeLeaf4 { + fn alloc_node_leaf(&self) -> *mut NodeLeaf { self.inner.alloc_slab(4).cast() } - fn alloc_node_leaf16(&self) -> *mut NodeLeaf16 { - self.inner.alloc_slab(5).cast() - } - fn alloc_node_leaf48(&self) -> *mut NodeLeaf48 { - self.inner.alloc_slab(6).cast() - } - fn alloc_node_leaf256(&self) -> *mut NodeLeaf256 { - self.inner.alloc_slab(7).cast() - } fn dealloc_node_internal4(&self, ptr: *mut NodeInternal4) { self.inner.dealloc_slab(0, ptr.cast()) @@ -127,18 +108,9 @@ impl<'t, V: crate::Value> ArtAllocator for ArtMultiSlabAllocator<'t, V> { fn dealloc_node_internal256(&self, ptr: *mut NodeInternal256) { self.inner.dealloc_slab(3, ptr.cast()) } - fn dealloc_node_leaf4(&self, ptr: *mut NodeLeaf4) { + fn dealloc_node_leaf(&self, ptr: *mut NodeLeaf) { self.inner.dealloc_slab(4, ptr.cast()) } - fn dealloc_node_leaf16(&self, ptr: *mut NodeLeaf16) { - self.inner.dealloc_slab(5, ptr.cast()) - } - fn dealloc_node_leaf48(&self, ptr: *mut NodeLeaf48) { - self.inner.dealloc_slab(6, ptr.cast()) - } - fn dealloc_node_leaf256(&self, ptr: *mut NodeLeaf256) { - self.inner.dealloc_slab(7, ptr.cast()) - } } impl<'t, V: crate::Value> ArtMultiSlabAllocator<'t, V> { diff --git a/libs/neonart/src/allocator/slab.rs b/libs/neonart/src/allocator/slab.rs index 4583ef17ae..6b69157af7 100644 --- a/libs/neonart/src/allocator/slab.rs +++ b/libs/neonart/src/allocator/slab.rs @@ -235,7 +235,6 @@ impl SlabDesc { } } - #[cfg(test)] mod tests { use super::*; diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index 88641379a1..4e93bf5c0e 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -20,8 +20,7 @@ //! //! - All keys have the same length //! -//! - Multi-value leaves. The values are stored directly in one of the four different leaf node -//! types. +//! - Single-value leaves. //! //! - For collapsing inner nodes, we use the Pessimistic approach, where each inner node stores a //! variable length "prefix", which stores the keys of all the one-way nodes which have been @@ -144,7 +143,7 @@ pub use allocator::ArtMultiSlabAllocator; /// Fixed-length key type. /// -pub trait Key: Clone + Debug { +pub trait Key: Debug { const KEY_LEN: usize; fn as_bytes(&self) -> &[u8]; @@ -154,7 +153,8 @@ pub trait Key: Clone + Debug { /// /// Values need to be Cloneable, because when a node "grows", the value is copied to a new node and /// the old sticks around until all readers that might see the old value are gone. -pub trait Value: Clone {} +// fixme obsolete, no longer needs Clone +pub trait Value {} const MAX_GARBAGE: usize = 1024; @@ -277,7 +277,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator> TreeInitStruct<'t, K, V, } } -impl<'t, K: Key + Clone, V: Value, A: ArtAllocator> TreeWriteAccess<'t, K, V, A> { +impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteAccess<'t, K, V, A> { pub fn start_write<'g>(&'t self) -> TreeWriteGuard<'g, K, V, A> where 't: 'g, @@ -299,7 +299,7 @@ impl<'t, K: Key + Clone, V: Value, A: ArtAllocator> TreeWriteAccess<'t, K, V, } } -impl<'t, K: Key + Clone, V: Value> TreeReadAccess<'t, K, V> { +impl<'t, K: Key, V: Value> TreeReadAccess<'t, K, V> { pub fn start_read(&'t self) -> TreeReadGuard<'t, K, V> { TreeReadGuard { tree: &self.tree, @@ -340,23 +340,58 @@ where created_garbage: bool, } -impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'t, K, V, A> { +pub enum UpdateAction { + Nothing, + Insert(V), + Remove, +} + +impl<'e, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'e, K, V, A> { /// Get a value - pub fn get(&'t mut self, key: &K) -> Option<&'t V> { + pub fn get(&'e mut self, key: &K) -> Option<&'e V> { algorithm::search(key, self.tree_writer.tree.root, &self.epoch_pin) } /// Insert a value - pub fn insert(self, key: &K, value: V) { - self.update_with_fn(key, |_| Some(value)) + pub fn insert(self, key: &K, value: V) -> Result<(), ()> { + let mut success = None; + + self.update_with_fn(key, |existing| { + if let Some(_) = existing { + success = Some(false); + UpdateAction::Nothing + } else { + success = Some(true); + UpdateAction::Insert(value) + } + }); + if success.expect("value_fn not called") { + Ok(()) + } else { + Err(()) + } } - /// Remove value - pub fn remove(self, key: &K) -> Option { + /// Remove value. Returns true if it existed + pub fn remove(self, key: &K) -> bool + { + let mut result = false; + self.update_with_fn(key, |existing| { + result = existing.is_some(); + UpdateAction::Remove + }); + result + } + + /// Try to remove value and return the old value. + pub fn remove_and_return(self, key: &K) -> Option + where + V: Clone, + { let mut old = None; self.update_with_fn(key, |existing| { old = existing.cloned(); - None + UpdateAction::Remove }); old } @@ -366,10 +401,10 @@ impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'t, K, V, A> { /// The function is passed a reference to the existing value, if any. If the function /// returns None, the value is removed from the tree (or if there was no existing value, /// does nothing). If the function returns Some, the existing value is replaced, of if there - /// was no existing value, it is inserted. + /// was no existing value, it is inserted. FIXME: update comment pub fn update_with_fn(mut self, key: &K, value_fn: F) where - F: FnOnce(Option<&V>) -> Option, + F: FnOnce(Option<&V>) -> UpdateAction, { algorithm::update_fn(key, value_fn, self.tree_writer.tree.root, &mut self); @@ -511,12 +546,12 @@ fn increment_key(key: &mut [u8]) -> bool { } // Debugging functions -impl<'t, K: Key, V: Value + Debug> TreeReadGuard<'t, K, V> { +impl<'e, K: Key, V: Value + Debug> TreeReadGuard<'e, K, V> { pub fn dump(&mut self) { algorithm::dump_tree(self.tree.root, &self.epoch_pin) } } -impl<'t, K: Key, V: Value + Debug> TreeWriteGuard<'t, K, V, ArtMultiSlabAllocator<'t, V>> { +impl<'e, K: Key, V: Value + Debug> TreeWriteGuard<'e, K, V, ArtMultiSlabAllocator<'e, V>> { pub fn get_statistics(&self) -> ArtTreeStatistics { self.tree_writer.allocator.get_statistics() } diff --git a/libs/neonart/src/tests.rs b/libs/neonart/src/tests.rs index 0be971fde3..4d6d0aceed 100644 --- a/libs/neonart/src/tests.rs +++ b/libs/neonart/src/tests.rs @@ -1,11 +1,14 @@ use std::collections::BTreeMap; use std::collections::HashSet; +use std::fmt::{Debug, Formatter}; +use std::sync::atomic::{AtomicUsize, Ordering}; use crate::ArtAllocator; use crate::ArtMultiSlabAllocator; use crate::TreeInitStruct; use crate::TreeIterator; use crate::TreeWriteAccess; +use crate::UpdateAction; use crate::{Key, Value}; @@ -55,7 +58,8 @@ fn test_inserts + Copy>(keys: &[K]) { for (idx, k) in keys.iter().enumerate() { let w = tree_writer.start_write(); - w.insert(&(*k).into(), idx); + let res = w.insert(&(*k).into(), idx); + assert!(res.is_ok()); } for (idx, k) in keys.iter().enumerate() { @@ -103,12 +107,38 @@ fn sparse() { test_inserts(&keys); } -#[derive(Clone, Copy, Debug)] +struct TestValue(AtomicUsize); + +impl TestValue { + fn new(val: usize) -> TestValue { + TestValue(AtomicUsize::new(val)) + } + + fn load(&self) -> usize { + self.0.load(Ordering::Relaxed) + } +} + +impl Value for TestValue {} + +impl Clone for TestValue { + fn clone(&self) -> TestValue { + TestValue::new(self.load()) + } +} + +impl Debug for TestValue { + fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "{:?}", self.load()) + } +} + +#[derive(Clone, Debug)] struct TestOp(TestKey, Option); -fn apply_op>( +fn apply_op>( op: &TestOp, - tree: &TreeWriteAccess, + tree: &TreeWriteAccess, shadow: &mut BTreeMap, ) { eprintln!("applying op: {op:?}"); @@ -123,24 +153,33 @@ fn apply_op>( // apply to Art tree let w = tree.start_write(); w.update_with_fn(&op.0, |existing| { - assert_eq!(existing, shadow_existing.as_ref()); - return op.1; + assert_eq!(existing.map(TestValue::load), shadow_existing); + + match (existing, op.1) { + (None, None) => UpdateAction::Nothing, + (None, Some(new_val)) => UpdateAction::Insert(TestValue::new(new_val)), + (Some(_old_val), None) => UpdateAction::Remove, + (Some(old_val), Some(new_val)) => { + old_val.0.store(new_val, Ordering::Relaxed); + UpdateAction::Nothing + } + } }); } -fn test_iter>( - tree: &TreeWriteAccess, +fn test_iter>( + tree: &TreeWriteAccess, shadow: &BTreeMap, ) { let mut shadow_iter = shadow.iter(); let mut iter = TreeIterator::new(&(TestKey::MIN..TestKey::MAX)); loop { - let shadow_item = shadow_iter.next().map(|(k, v)| (k.clone(), v)); + let shadow_item = shadow_iter.next().map(|(k, v)| (k.clone(), v.clone())); let r = tree.start_read(); let item = iter.next(&r); - if shadow_item != item { + if shadow_item != item.map(|(k, v)| (k, v.load())) { eprintln!( "FAIL: iterator returned {:?}, expected {:?}", item, shadow_item @@ -170,7 +209,7 @@ fn random_ops() { let allocator = ArtMultiSlabAllocator::new(&mut area); - let init_struct = TreeInitStruct::::new(allocator); + let init_struct = TreeInitStruct::::new(allocator); let tree_writer = init_struct.attach_writer(); let mut shadow: std::collections::BTreeMap = BTreeMap::new(); diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index fba4a5067c..d5eab85930 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -9,6 +9,7 @@ crate-type = ["staticlib"] [dependencies] axum.workspace = true bytes.workspace = true +clashmap.workspace = true http.workspace = true libc.workspace = true nix.workspace = true diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index ee3964e283..d754428fa5 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -20,6 +20,8 @@ use tokio::task::spawn_blocking; pub type CacheBlock = u64; +pub const INVALID_CACHE_BLOCK: CacheBlock = u64::MAX; + pub struct FileCache { file: Arc, @@ -39,10 +41,7 @@ struct FreeList { } impl FileCache { - pub fn new( - file_cache_path: &Path, - mut initial_size: u64, - ) -> Result { + pub fn new(file_cache_path: &Path, mut initial_size: u64) -> Result { if initial_size < 100 { tracing::warn!( "min size for file cache is 100 blocks, {} requested", @@ -95,7 +94,8 @@ impl FileCache { let dst_ref = unsafe { std::slice::from_raw_parts_mut(dst.stable_mut_ptr(), BLCKSZ) }; - spawn_blocking(move || file.read_exact_at(dst_ref, cache_block as u64 * BLCKSZ as u64)).await??; + spawn_blocking(move || file.read_exact_at(dst_ref, cache_block as u64 * BLCKSZ as u64)) + .await??; Ok(()) } @@ -109,7 +109,8 @@ impl FileCache { let src_ref = unsafe { std::slice::from_raw_parts(src.stable_ptr(), BLCKSZ) }; - spawn_blocking(move || file.write_all_at(src_ref, cache_block as u64 * BLCKSZ as u64)).await??; + spawn_blocking(move || file.write_all_at(src_ref, cache_block as u64 * BLCKSZ as u64)) + .await??; Ok(()) } diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index bc1dd1ea00..415684a6fc 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -24,15 +24,17 @@ use std::mem::MaybeUninit; use std::ops::Range; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; -use utils::lsn::Lsn; +use utils::lsn::{Lsn, AtomicLsn}; use zerocopy::FromBytes; use crate::file_cache::{CacheBlock, FileCache}; +use crate::file_cache::INVALID_CACHE_BLOCK; use pageserver_page_api::model::RelTag; use neonart; +use neonart::UpdateAction; use neonart::TreeInitStruct; use neonart::TreeIterator; @@ -123,36 +125,25 @@ impl<'t> IntegratedCacheInitStruct<'t> { } } -#[derive(Clone)] enum TreeEntry { Rel(RelEntry), Block(BlockEntry), } struct BlockEntry { - lw_lsn: Lsn, - cache_block: Option, + lw_lsn: AtomicLsn, + cache_block: AtomicU64, - io_in_progress: AtomicBool, + pinned: AtomicBool, // 'referenced' bit for the clock algorithm referenced: AtomicBool, } -impl Clone for BlockEntry { - fn clone(&self) -> BlockEntry { - BlockEntry { - lw_lsn: self.lw_lsn, - cache_block: self.cache_block, - referenced: AtomicBool::new(self.referenced.load(Ordering::Relaxed)), - } - } -} - -#[derive(Clone, Default)] struct RelEntry { /// cached size of the relation - nblocks: Option, + /// u32::MAX means 'not known' (that's InvalidBlockNumber in Postgres) + nblocks: AtomicU32, } #[derive( @@ -272,7 +263,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> { }; block_entry.referenced.store(true, Ordering::Relaxed); - if let Some(cache_block) = block_entry.cache_block { + let cache_block = block_entry.cache_block.load(Ordering::Relaxed); + if cache_block != INVALID_CACHE_BLOCK { self.file_cache .as_ref() .unwrap() @@ -280,7 +272,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { .await?; Ok(CacheResult::Found(())) } else { - Ok(CacheResult::NotFound(block_entry.lw_lsn)) + Ok(CacheResult::NotFound(block_entry.lw_lsn.load())) } } else { let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); @@ -305,10 +297,12 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // in cache. block_entry.referenced.store(true, Ordering::Relaxed); - if let Some(_cache_block) = block_entry.cache_block { + let cache_block = block_entry.cache_block.load(Ordering::Relaxed); + + if cache_block != INVALID_CACHE_BLOCK { Ok(CacheResult::Found(())) } else { - Ok(CacheResult::NotFound(block_entry.lw_lsn)) + Ok(CacheResult::NotFound(block_entry.lw_lsn.load())) } } else { let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); @@ -341,12 +335,19 @@ impl<'t> IntegratedCacheWriteAccess<'t> { pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { let w = self.cache_tree.start_write(); - w.insert( - &TreeKey::from(rel), - TreeEntry::Rel(RelEntry { - nblocks: Some(nblocks), - }), - ); + w.update_with_fn(&TreeKey::from(rel), |existing| { + match existing { + None => UpdateAction::Insert( + TreeEntry::Rel(RelEntry { + nblocks: AtomicU32::new(nblocks), + })), + Some(TreeEntry::Block(_)) => panic!("unexpected tree entry type for rel key"), + Some(TreeEntry::Rel(rel)) => { + rel.nblocks.store(nblocks, Ordering::Relaxed); + UpdateAction::Nothing + } + } + }); } /// Remember the given page contents in the cache. @@ -356,58 +357,159 @@ impl<'t> IntegratedCacheWriteAccess<'t> { block_number: u32, src: impl uring_common::buf::IoBuf + Send + Sync, lw_lsn: Lsn, + is_write: bool, ) { - if let Some(file_cache) = self.file_cache.as_ref() { + let key = TreeKey::from((rel, block_number)); + + // FIXME: make this work when file cache is disabled. Or make it mandatory + let file_cache = self.file_cache.as_ref().unwrap(); + + if is_write { + // there should be no concurrent IOs. If a backend tries to read the page + // at the same time, they may get a torn write. That's the same as with + // regular POSIX filesystem read() and write() + + // First check if we have a block in cache already let w = self.cache_tree.start_write(); - let key = TreeKey::from((rel, block_number)); - - let mut reserved_cache_block = loop { - if let Some(x) = file_cache.alloc_block() { - break Some(x); - } - if let Some(x) = self.try_evict_one_cache_block() { - break Some(x); - } - }; - - let mut cache_block = None; + let mut old_cache_block = None; + let mut found_existing = false; w.update_with_fn(&key, |existing| { if let Some(existing) = existing { - let mut block_entry = if let TreeEntry::Block(e) = existing.clone() { + let block_entry = if let TreeEntry::Block(e) = existing { e } else { panic!("unexpected tree entry type for block key"); }; - block_entry.referenced.store(true, Ordering::Relaxed); - block_entry.lw_lsn = lw_lsn; - if block_entry.cache_block.is_none() { - block_entry.cache_block = reserved_cache_block.take(); + + found_existing = true; + + // Prevent this entry from being evicted + let was_pinned = block_entry.pinned.swap(true, Ordering::Relaxed); + if was_pinned { + // this is unexpected, because the caller has obtained the io-in-progress lock, + // so no one else should try to modify the page at the same time. + panic!("block entry was unexpectedly pinned"); + } + + let cache_block = block_entry.cache_block.load(Ordering::Relaxed); + old_cache_block = if cache_block != INVALID_CACHE_BLOCK { + Some(cache_block) + } else { + None + }; + } + // if there was no existing entry, we will insert one, but not yet + UpdateAction::Nothing + }); + + // Allocate a new block if required + let cache_block = old_cache_block.unwrap_or_else(|| { + loop { + if let Some(x) = file_cache.alloc_block() { + break x; + } + if let Some(x) = self.try_evict_one_cache_block() { + break x; } - cache_block = block_entry.cache_block; - Some(TreeEntry::Block(block_entry)) - } else { - cache_block = reserved_cache_block.take(); - Some(TreeEntry::Block(BlockEntry { - lw_lsn: lw_lsn, - cache_block: cache_block, - referenced: AtomicBool::new(true), - })) } }); - // If we didn't need to block we reserved, put it back to the free list - if let Some(x) = reserved_cache_block { - file_cache.dealloc_block(x); - } - - let cache_block = cache_block.unwrap(); + // Write the page to the cache file file_cache .write_block(cache_block, src) .await .expect("error writing to cache"); - }; + // FIXME: handle errors gracefully. + // FIXME: unpin the block entry on error + + // Update the block entry + let w = self.cache_tree.start_write(); + w.update_with_fn(&key, |existing| { + assert_eq!(found_existing, existing.is_some()); + if let Some(existing) = existing { + let block_entry = if let TreeEntry::Block(e) = existing { + e + } else { + panic!("unexpected tree entry type for block key"); + }; + + // Update the cache block + let old_blk = block_entry.cache_block.compare_exchange(INVALID_CACHE_BLOCK, cache_block, Ordering::Relaxed, Ordering::Relaxed); + assert!(old_blk == Ok(INVALID_CACHE_BLOCK) || old_blk == Err(cache_block)); + + block_entry.lw_lsn.store(lw_lsn); + + block_entry.referenced.store(true, Ordering::Relaxed); + + let was_pinned = block_entry.pinned.swap(false, Ordering::Relaxed); + assert!(was_pinned); + UpdateAction::Nothing + } + else + { + UpdateAction::Insert(TreeEntry::Block(BlockEntry { + lw_lsn: AtomicLsn::new(lw_lsn.0), + cache_block: AtomicU64::new(cache_block), + pinned: AtomicBool::new(false), + referenced: AtomicBool::new(true), + })) + } + }); + } else { + // !is_write + // + // We can assume that it doesn't already exist, because the + // caller is assumed to have already checked it, and holds + // the io-in-progress lock. (The BlockEntry might exist, but no cache block) + + // Allocate a new block first + let cache_block = { + loop { + if let Some(x) = file_cache.alloc_block() { + break x; + } + if let Some(x) = self.try_evict_one_cache_block() { + break x; + } + } + }; + + // Write the page to the cache file + file_cache + .write_block(cache_block, src) + .await + .expect("error writing to cache"); + // FIXME: handle errors gracefully. + + let w = self.cache_tree.start_write(); + + w.update_with_fn(&key, |existing| { + if let Some(existing) = existing { + let block_entry = if let TreeEntry::Block(e) = existing { + e + } else { + panic!("unexpected tree entry type for block key"); + }; + + assert!(!block_entry.pinned.load(Ordering::Relaxed)); + + let old_cache_block = block_entry.cache_block.swap(cache_block, Ordering::Relaxed); + if old_cache_block != INVALID_CACHE_BLOCK { + panic!("remember_page called in !is_write mode, but page is already cached at blk {}", old_cache_block); + } + UpdateAction::Nothing + } else { + UpdateAction::Insert(TreeEntry::Block(BlockEntry { + lw_lsn: AtomicLsn::new(lw_lsn.0), + cache_block: AtomicU64::new(cache_block), + pinned: AtomicBool::new(false), + referenced: AtomicBool::new(true), + })) + } + }); + } } /// Forget information about given relation in the cache. (For DROP TABLE and such) @@ -447,17 +549,26 @@ impl<'t> IntegratedCacheWriteAccess<'t> { if !blk_entry.referenced.swap(false, Ordering::Relaxed) { // Evict this let w = self.cache_tree.start_write(); - let old = w.remove(&k); - if let Some(TreeEntry::Block(old)) = old { - let _ = self - .global_lw_lsn - .fetch_max(old.lw_lsn.0, Ordering::Relaxed); - if let Some(cache_block) = old.cache_block { - return Some(cache_block); + + let mut evicted_cache_block = None; + w.update_with_fn(&k, |old| { + match old { + None => UpdateAction::Nothing, + Some(TreeEntry::Rel(_)) => panic!("unexepcted Rel entry"), + Some(TreeEntry::Block(old)) => { + let _ = self + .global_lw_lsn + .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); + let cache_block = old.cache_block.load(Ordering::Relaxed); + if cache_block != INVALID_CACHE_BLOCK { + evicted_cache_block = Some(cache_block); + } + // TODO: we don't evict the entry, just the block. Does it make + // sense to keep the entry? + UpdateAction::Nothing + } } - } else { - assert!(old.is_none()); - } + }); } } } @@ -479,7 +590,8 @@ fn get_rel_size<'t>(r: &neonart::TreeReadGuard, rel: &RelTag panic!("unexpected tree entry type for rel key"); }; - if let Some(nblocks) = rel_entry.nblocks { + let nblocks = rel_entry.nblocks.load(Ordering::Relaxed); + if nblocks != u32::MAX { Some(nblocks) } else { None @@ -526,7 +638,12 @@ impl<'e> BackendCacheReadOp<'e> { }; block_entry.referenced.store(true, Ordering::Relaxed); - block_entry.cache_block + let cache_block = block_entry.cache_block.load(Ordering::Relaxed); + if cache_block != INVALID_CACHE_BLOCK { + Some(cache_block) + } else { + None + } } else { None } diff --git a/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs b/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs new file mode 100644 index 0000000000..378f114d8d --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs @@ -0,0 +1,81 @@ +use std::hash::Hash; +use std::sync::Arc; +use std::cmp::Eq; + +use tokio::sync::{Mutex, OwnedMutexGuard}; + +use clashmap::ClashMap; +use clashmap::Entry; + +use pageserver_page_api::model; + +#[derive(Clone, Eq, Hash, PartialEq)] +pub enum RequestInProgressKey { + Db(u32), + Rel(model::RelTag), + Block(model::RelTag, u32), +} + +pub type RequestInProgressTable = MutexHashSet; + +// more primitive locking thingie: + +pub struct MutexHashSet + where K: Clone + Eq + Hash +{ + lock_table: ClashMap>>, +} + +pub struct MutexHashSetGuard<'a, K> + where K: Clone + Eq + Hash +{ + pub key: K, + set: &'a MutexHashSet, + mutex: Arc>, + _guard: OwnedMutexGuard<()>, +} + +impl<'a, K> Drop for MutexHashSetGuard<'a, K> + where K: Clone + Eq + Hash +{ + fn drop(&mut self) { + let (_old_key, old_val) = self.set.lock_table.remove(&self.key).unwrap(); + assert!(Arc::ptr_eq(&old_val, &self.mutex)); + + // the guard will be dropped as we return + } +} + +impl MutexHashSet + where K: Clone + Eq + Hash +{ + pub fn new() -> MutexHashSet { + MutexHashSet { + lock_table: ClashMap::new(), + } + } + + pub async fn lock<'a>(&'a self, key: K) -> MutexHashSetGuard<'a, K> + { + let my_mutex = Arc::new(Mutex::new(())); + let my_guard = Arc::clone(&my_mutex).lock_owned().await; + + loop { + let lock = match self.lock_table.entry(key.clone()) { + Entry::Occupied(e) => Arc::clone(e.get()), + Entry::Vacant(e) => { + e.insert(Arc::clone(&my_mutex)); + break; + } + }; + let _ = lock.lock().await; + } + + MutexHashSetGuard { + key: key, + set: &self, + mutex: my_mutex, + _guard: my_guard, + } + } +} diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 90574da6f3..8670f860dd 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -8,12 +8,13 @@ use crate::init::CommunicatorInitStruct; use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess}; use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest}; use crate::neon_request::{NeonIORequest, NeonIOResult}; +use crate::worker_process::in_progress_ios::{RequestInProgressTable, RequestInProgressKey}; use pageserver_client_grpc::PageserverClient; use pageserver_page_api::model; use tokio::io::AsyncReadExt; -use uring_common::buf::IoBuf; use tokio_pipe::PipeRead; +use uring_common::buf::IoBuf; use super::callbacks::{get_request_lsn, notify_proc}; @@ -31,8 +32,11 @@ pub struct CommunicatorWorkerProcessStruct<'a> { submission_pipe_read_raw_fd: i32, next_request_id: AtomicU64, + + in_progress_table: RequestInProgressTable, } + pub(super) async fn init( cis: Box, tenant_id: String, @@ -45,10 +49,7 @@ pub(super) async fn init( let last_lsn = get_request_lsn(); let file_cache = if let Some(path) = file_cache_path { - Some( - FileCache::new(&path, file_cache_size) - .expect("could not create cache file"), - ) + Some(FileCache::new(&path, file_cache_size).expect("could not create cache file")) } else { // FIXME: temporarily for testing, use LFC even if disabled Some( @@ -70,6 +71,7 @@ pub(super) async fn init( cache, submission_pipe_read_raw_fd: cis.submission_pipe_read_fd, next_request_id: AtomicU64::new(1), + in_progress_table: RequestInProgressTable::new(), }; this @@ -142,6 +144,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIORequest::RelExists(req) => { let rel = req.reltag(); + let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone())); + let not_modified_since = match self.cache.get_rel_exists(&rel) { CacheResult::Found(exists) => return NeonIOResult::RelExists(exists), CacheResult::NotFound(lsn) => lsn, @@ -166,6 +170,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIORequest::RelSize(req) => { let rel = req.reltag(); + let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone())); + // Check the cache first let not_modified_since = match self.cache.get_rel_size(&rel) { CacheResult::Found(nblocks) => { @@ -207,6 +213,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIOResult::PrefetchVLaunched } NeonIORequest::DbSize(req) => { + let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Db(req.db_oid)); + // Check the cache first let not_modified_since = match self.cache.get_db_size(req.db_oid) { CacheResult::Found(db_size) => { @@ -236,30 +244,36 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIORequest::WritePage(req) => { // Also store it in the LFC while we still have it let rel = req.reltag(); + let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), req.block_number)); self.cache - .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn)) + .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) .await; NeonIOResult::WriteOK } NeonIORequest::RelExtend(req) => { + // TODO: need to grab an io-in-progress lock for this? I guess not self.cache .remember_rel_size(&req.reltag(), req.block_number + 1); NeonIOResult::WriteOK } NeonIORequest::RelZeroExtend(req) => { + // TODO: need to grab an io-in-progress lock for this? I guess not self.cache .remember_rel_size(&req.reltag(), req.block_number + req.nblocks); NeonIOResult::WriteOK } NeonIORequest::RelCreate(req) => { + // TODO: need to grab an io-in-progress lock for this? I guess not self.cache.remember_rel_size(&req.reltag(), 0); NeonIOResult::WriteOK } NeonIORequest::RelTruncate(req) => { + // TODO: need to grab an io-in-progress lock for this? I guess not self.cache.remember_rel_size(&req.reltag(), req.nblocks); NeonIOResult::WriteOK } NeonIORequest::RelUnlink(req) => { + // TODO: need to grab an io-in-progress lock for this? I guess not self.cache.forget_rel(&req.reltag()); NeonIOResult::WriteOK } @@ -270,9 +284,14 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let rel = req.reltag(); // Check the cache first - let mut cache_misses = Vec::new(); + let mut cache_misses = Vec::with_capacity(req.nblocks as usize); for i in 0..req.nblocks { let blkno = req.block_number + i as u32; + + // note: this is deadlock-safe even though we hold multiple locks at the same time, + // because they're always acquired in the same order. + let in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), blkno)).await; + let dest = req.dest[i as usize]; let not_modified_since = match self.cache.get_page(&rel, blkno, dest).await { Ok(CacheResult::Found(_)) => { @@ -283,19 +302,19 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Ok(CacheResult::NotFound(lsn)) => lsn, Err(_io_error) => return Err(-1), // FIXME errno? }; - cache_misses.push((blkno, not_modified_since, dest)); + cache_misses.push((blkno, not_modified_since, dest, in_progress_guard)); } if cache_misses.is_empty() { return Ok(()); } let not_modified_since = cache_misses .iter() - .map(|(_blkno, lsn, _dest)| *lsn) + .map(|(_blkno, lsn, _dest, _guard)| *lsn) .max() .unwrap(); // TODO: Use batched protocol - for (blkno, _lsn, dest) in cache_misses.iter() { + for (blkno, _lsn, dest, _guard) in cache_misses.iter() { match self .pageserver_client .get_page(&model::GetPageRequest { @@ -316,11 +335,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { std::ptr::copy_nonoverlapping(src.as_ptr(), dest.as_mut_ptr(), len); }; - trace!("remembering blk {} in rel {:?} in LFC", blkno, rel); - // Also store it in the LFC while we have it self.cache - .remember_page(&rel, *blkno, page_image, not_modified_since) + .remember_page(&rel, *blkno, page_image, not_modified_since, false) .await; } Err(err) => { @@ -339,29 +356,34 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let rel = req.reltag(); // Check the cache first - let mut cache_misses = Vec::new(); + let mut cache_misses = Vec::with_capacity(req.nblocks as usize); for i in 0..req.nblocks { let blkno = req.block_number + i as u32; + + // note: this is deadlock-safe even though we hold multiple locks at the same time, + // because they're always acquired in the same order. + let in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), blkno)).await; + let not_modified_since = match self.cache.page_is_cached(&rel, blkno).await { Ok(CacheResult::Found(_)) => { - trace!("found blk {} in rel {:?} in LFC ", req.block_number, rel); + trace!("found blk {} in rel {:?} in LFC ", blkno, rel); continue; } Ok(CacheResult::NotFound(lsn)) => lsn, Err(_io_error) => return Err(-1), // FIXME errno? }; - cache_misses.push((req.block_number, not_modified_since)); + cache_misses.push((blkno, not_modified_since, in_progress_guard)); } if cache_misses.is_empty() { return Ok(()); } - let not_modified_since = cache_misses.iter().map(|(_blkno, lsn)| *lsn).max().unwrap(); + let not_modified_since = cache_misses.iter().map(|(_blkno, lsn, _guard)| *lsn).max().unwrap(); // TODO: spawn separate tasks for these. Use the integrated cache to keep track of the // in-flight requests // TODO: Use batched protocol - for (blkno, _lsn) in cache_misses.iter() { + for (blkno, _lsn, _guard) in cache_misses.iter() { match self .pageserver_client .get_page(&model::GetPageRequest { @@ -376,10 +398,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Ok(page_image) => { trace!( "prefetch completed, remembering blk {} in rel {:?} in LFC", - req.block_number, rel + *blkno, rel ); self.cache - .remember_page(&rel, req.block_number, page_image, not_modified_since) + .remember_page(&rel, *blkno, page_image, not_modified_since, false) .await; } Err(err) => { diff --git a/pgxn/neon/communicator/src/worker_process/mod.rs b/pgxn/neon/communicator/src/worker_process/mod.rs index 760d8853b0..064d106d4c 100644 --- a/pgxn/neon/communicator/src/worker_process/mod.rs +++ b/pgxn/neon/communicator/src/worker_process/mod.rs @@ -10,3 +10,5 @@ mod logging; mod main_loop; mod metrics_exporter; mod worker_interface; + +mod in_progress_ios;