implement shrinking nodes

This commit is contained in:
Heikki Linnakangas
2025-05-12 03:57:02 +03:00
parent 731667ac37
commit 8abb4dab6d
3 changed files with 240 additions and 15 deletions

View File

@@ -102,6 +102,7 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>, F>(
this_value_fn,
root_ref,
None,
None,
guard,
0,
key_bytes,
@@ -234,6 +235,7 @@ pub(crate) fn update_recurse<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>, F>(
value_fn: F,
node: NodeRef<'e, V>,
rparent: Option<(ReadLockedNodeRef<V>, u8)>,
rgrandparent: Option<(ReadLockedNodeRef<V>, u8)>,
guard: &'g mut TreeWriteGuard<'e, K, V, A>,
level: usize,
orig_key: &[u8],
@@ -277,18 +279,33 @@ where
let value_mut = wnode.get_leaf_value_mut();
match value_fn(Some(value_mut)) {
UpdateAction::Nothing => {}
UpdateAction::Nothing => {
wparent.write_unlock();
wnode.write_unlock();
}
UpdateAction::Insert(_) => panic!("cannot insert over existing value"),
UpdateAction::Remove => {
// TODO: Shrink the node
// TODO: If the parent becomes empty, unlink it from grandparent
// TODO: If parent has only one child left, merge it with the child, extending its
// prefix
wparent.delete_child(parent_key);
if wparent.can_shrink_after_delete() {
if let Some((rgrandparent, grandparent_key)) = rgrandparent {
let mut wgrandparent = rgrandparent.upgrade_to_write_lock_or_restart()?;
wparent.delete_child(parent_key);
shrink(&wparent, &mut wgrandparent, grandparent_key, guard)?;
wparent.write_unlock_obsolete();
} else {
wparent.delete_child(parent_key);
wparent.write_unlock();
}
} else {
wparent.delete_child(parent_key);
wparent.write_unlock();
}
guard.remember_obsolete_node(wnode.as_ptr());
wnode.write_unlock_obsolete();
}
}
wnode.write_unlock();
wparent.write_unlock();
return Ok(());
}
@@ -334,8 +351,8 @@ where
return Ok(());
} else {
let next_child = next_node.unwrap(); // checked above it's not None
if let Some((rparent, _)) = rparent {
rparent.read_unlock_or_restart()?;
if let Some((ref rparent, _)) = rparent {
rparent.check_or_restart()?;
}
// recurse to next level
@@ -344,6 +361,7 @@ where
value_fn,
next_child,
Some((rnode, key[0])),
rparent,
guard,
level + 1,
orig_key,
@@ -491,6 +509,24 @@ fn insert_and_grow<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>>(
Ok(())
}
// On entry: 'parent' and 'node' are locked
fn shrink<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>>(
wnode: &WriteLockedNodeRef<V>,
parent: &mut WriteLockedNodeRef<V>,
parent_key_byte: u8,
guard: &'g mut TreeWriteGuard<'e, K, V, A>,
) -> Result<(), ArtError> {
eprintln!("SHRINK!");
let smaller_node = wnode.shrink(guard.tree_writer.allocator)?;
// Replace the pointer in the parent
parent.replace_child(parent_key_byte, smaller_node.into_ptr());
guard.remember_obsolete_node(wnode.as_ptr());
Ok(())
}
// Allocate a new leaf node to hold 'value'. If the key is long, we
// may need to allocate new internal nodes to hold it too
fn allocate_node_for_value<'a, V: Value, A: ArtAllocator<V>>(

View File

@@ -237,6 +237,16 @@ impl<V: Value> NodePtr<V> {
}
}
pub(crate) fn can_shrink_after_delete(&self) -> bool {
match self.variant() {
NodeVariant::Internal4(n) => n.can_shrink_after_delete(),
NodeVariant::Internal16(n) => n.can_shrink_after_delete(),
NodeVariant::Internal48(n) => n.can_shrink_after_delete(),
NodeVariant::Internal256(n) => n.can_shrink_after_delete(),
NodeVariant::Leaf(_) => panic!("can_shrink_after_delete() called on leaf node"),
}
}
pub(crate) fn find_child(&self, key_byte: u8) -> Option<NodePtr<V>> {
match self.variant() {
NodeVariant::Internal4(n) => n.find_child(key_byte),
@@ -268,13 +278,30 @@ impl<V: Value> NodePtr<V> {
}
pub(crate) fn grow(&self, allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
match self.variant() {
let bigger = match self.variant() {
NodeVariant::Internal4(n) => n.grow(allocator),
NodeVariant::Internal16(n) => n.grow(allocator),
NodeVariant::Internal48(n) => n.grow(allocator),
NodeVariant::Internal256(_) => panic!("cannot grow Internal256 node"),
NodeVariant::Leaf(_) => panic!("cannot grow Leaf node"),
}
};
/*
let mut key = 0;
loop {
let a = self.find_next_child(key);
let b = bigger.find_next_child(key);
assert_eq!(a, b);
if let Some((akey, _)) = a {
if akey == u8::MAX {
break;
}
key = akey + 1;
} else {
break;
}
}
*/
bigger
}
pub(crate) fn insert_child(&mut self, key_byte: u8, child: NodePtr<V>) {
@@ -307,6 +334,16 @@ impl<V: Value> NodePtr<V> {
}
}
pub(crate) fn shrink(&self, allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
match self.variant() {
NodeVariant::Internal4(_) => panic!("shrink called on internal4 node"),
NodeVariant::Internal16(n) => n.shrink(allocator),
NodeVariant::Internal48(n) => n.shrink(allocator),
NodeVariant::Internal256(n) => n.shrink(allocator),
NodeVariant::Leaf(_) => panic!("shrink called on leaf node"),
}
}
pub(crate) fn get_leaf_value(&self) -> &V {
match self.variant() {
NodeVariant::Internal4(_)
@@ -466,6 +503,10 @@ impl<V: Value> NodeInternal4<V> {
self.num_children == 4
}
fn can_shrink_after_delete(&self) -> bool {
false
}
fn insert_child(&mut self, key_byte: u8, child: NodePtr<V>) {
assert!(self.num_children < 4);
@@ -573,6 +614,10 @@ impl<V: Value> NodeInternal16<V> {
self.num_children == 16
}
fn can_shrink_after_delete(&self) -> bool {
self.num_children <= 5
}
fn insert_child(&mut self, key_byte: u8, child: NodePtr<V>) {
assert!(self.num_children < 16);
@@ -600,15 +645,58 @@ impl<V: Value> NodeInternal16<V> {
};
for i in 0..self.num_children as usize {
let idx = self.child_keys[i] as usize;
eprintln!("grow {i}: {idx}");
init.child_indexes[idx] = i as u8;
init.child_ptrs[i] = self.child_ptrs[i];
}
init.validate();
unsafe { ptr.write(init) };
ptr.into()
}
fn shrink(&self, allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
assert!(self.num_children <= 4);
let ptr: *mut NodeInternal4<V> = allocator.alloc_node_internal4().cast();
if ptr.is_null() {
panic!("out of memory");
}
let mut init = NodeInternal4 {
tag: NodeTag::Internal4,
lock_and_version: AtomicLockAndVersion::new(),
prefix: self.prefix.clone(),
prefix_len: self.prefix_len,
num_children: self.num_children,
child_keys: [0; 4],
child_ptrs: [const { NodePtr::null() }; 4],
};
for i in 0..self.num_children as usize {
init.child_keys[i] = self.child_keys[i];
init.child_ptrs[i] = self.child_ptrs[i];
}
unsafe { ptr.write(init) };
ptr.into()
}
}
impl<V: Value> NodeInternal48<V> {
fn validate(&self) {
let mut shadow_indexes = std::collections::HashSet::new();
let mut count = 0;
for i in 0..256 {
let idx = self.child_indexes[i];
if idx != INVALID_CHILD_INDEX {
assert!(idx < self.num_children, "i {} idx {}, num_children {}", i, idx, self.num_children);
assert!(shadow_indexes.get(&idx).is_none());
shadow_indexes.insert(idx);
count += 1;
}
}
assert_eq!(count, self.num_children);
}
fn get_prefix(&self) -> &[u8] {
&self.prefix[0..self.prefix_len as usize]
}
@@ -647,7 +735,8 @@ impl<V: Value> NodeInternal48<V> {
if idx == INVALID_CHILD_INDEX {
panic!("could not re-find parent with key {}", key_byte);
}
self.child_ptrs[idx as usize] = replacement
self.child_ptrs[idx as usize] = replacement;
self.validate();
}
fn delete_child(&mut self, key_byte: u8) {
@@ -655,20 +744,25 @@ impl<V: Value> NodeInternal48<V> {
if idx == INVALID_CHILD_INDEX as usize {
panic!("could not re-find parent with key {}", key_byte);
}
self.child_indexes[key_byte as usize] = INVALID_CHILD_INDEX;
self.num_children -= 1;
// Compact the child_ptrs array
let removed_idx = self.num_children as usize;
let removed_idx = (self.num_children - 1) as usize;
if idx != removed_idx {
for i in 0..u8::MAX as usize {
for i in 0..=u8::MAX as usize {
if self.child_indexes[i] as usize == removed_idx {
self.child_indexes[i] = idx as u8;
self.child_ptrs[idx] = self.child_ptrs[removed_idx];
self.child_indexes[key_byte as usize] = INVALID_CHILD_INDEX;
self.num_children -= 1;
self.validate();
return;
}
}
panic!("could not re-find last index on Internal48 node");
panic!("could not re-find last index {} on Internal48 node", removed_idx);
} else {
self.child_indexes[key_byte as usize] = INVALID_CHILD_INDEX;
self.num_children -= 1;
}
}
@@ -676,6 +770,10 @@ impl<V: Value> NodeInternal48<V> {
self.num_children == 48
}
fn can_shrink_after_delete(&self) -> bool {
self.num_children <= 17
}
fn insert_child(&mut self, key_byte: u8, child: NodePtr<V>) {
assert!(self.num_children < 48);
assert!(self.child_indexes[key_byte as usize] == INVALID_CHILD_INDEX);
@@ -683,6 +781,7 @@ impl<V: Value> NodeInternal48<V> {
self.child_indexes[key_byte as usize] = idx;
self.child_ptrs[idx as usize] = child;
self.num_children += 1;
self.validate();
}
fn grow(&self, allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
@@ -709,6 +808,37 @@ impl<V: Value> NodeInternal48<V> {
unsafe { ptr.write(init) };
ptr.into()
}
fn shrink(&self, allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
assert!(self.num_children <= 16);
let ptr: *mut NodeInternal16<V> = allocator.alloc_node_internal16().cast();
if ptr.is_null() {
panic!("out of memory");
}
let mut init = NodeInternal16 {
tag: NodeTag::Internal16,
lock_and_version: AtomicLockAndVersion::new(),
prefix: self.prefix.clone(),
prefix_len: self.prefix_len,
num_children: self.num_children,
child_keys: [0; 16],
child_ptrs: [const { NodePtr::null() }; 16],
};
let mut j = 0;
for i in 0..256 {
let idx = self.child_indexes[i];
if idx != INVALID_CHILD_INDEX {
init.child_keys[j] = i as u8;
init.child_ptrs[j] = self.child_ptrs[idx as usize];
j += 1;
}
}
assert_eq!(j, self.num_children as usize);
unsafe { ptr.write(init) };
ptr.into()
}
}
impl<V: Value> NodeInternal256<V> {
@@ -766,12 +896,46 @@ impl<V: Value> NodeInternal256<V> {
self.num_children == 256
}
fn can_shrink_after_delete(&self) -> bool {
self.num_children <= 49
}
fn insert_child(&mut self, key_byte: u8, child: NodePtr<V>) {
assert!(self.num_children < 256);
assert!(self.child_ptrs[key_byte as usize].is_null());
self.child_ptrs[key_byte as usize] = child;
self.num_children += 1;
}
fn shrink(&self, allocator: &impl ArtAllocator<V>) -> NodePtr<V> {
assert!(self.num_children <= 48);
let ptr: *mut NodeInternal48<V> = allocator.alloc_node_internal48().cast();
if ptr.is_null() {
panic!("out of memory");
}
let mut init = NodeInternal48 {
tag: NodeTag::Internal48,
lock_and_version: AtomicLockAndVersion::new(),
prefix: self.prefix.clone(),
prefix_len: self.prefix_len,
num_children: self.num_children as u8,
child_indexes: [INVALID_CHILD_INDEX; 256],
child_ptrs: [const { NodePtr::null() }; 48],
};
let mut j = 0;
for i in 0..256 {
if !self.child_ptrs[i].is_null() {
init.child_indexes[i] = j;
init.child_ptrs[j as usize] = self.child_ptrs[i];
j += 1;
}
}
assert_eq!(j as u16, self.num_children);
unsafe { ptr.write(init) };
ptr.into()
}
}
impl<V: Value> NodeLeaf<V> {

View File

@@ -138,6 +138,11 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> {
self.ptr.lockword().check_or_restart(self.version)?;
Ok(())
}
pub(crate) fn check_or_restart(&self) -> Result<(), ConcurrentUpdateError> {
self.ptr.lockword().check_or_restart(self.version)?;
Ok(())
}
}
/// A reference to a node that has been optimistically read-locked. The functions re-check
@@ -148,6 +153,10 @@ pub struct WriteLockedNodeRef<'e, V> {
}
impl<'e, V: Value> WriteLockedNodeRef<'e, V> {
pub(crate) fn can_shrink_after_delete(&self) -> bool {
self.ptr.can_shrink_after_delete()
}
pub(crate) fn write_unlock(mut self) {
self.ptr.lockword().write_unlock();
self.ptr = NodePtr::null();
@@ -190,6 +199,22 @@ impl<'e, V: Value> WriteLockedNodeRef<'e, V> {
})
}
pub(crate) fn shrink<'a, A>(
&self,
allocator: &'a A,
) -> Result<NewNodeRef<'a, V, A>, OutOfMemoryError>
where
A: ArtAllocator<V>,
{
// FIXME: check OOM
let new_node = self.ptr.shrink(allocator);
Ok(NewNodeRef {
ptr: new_node,
allocator,
extra_nodes: Vec::new(),
})
}
pub(crate) fn as_ptr(&self) -> NodePtr<V> {
self.ptr
}