From 44cc648dc8f290bd61d8259159d23784c77cf5e6 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 5 May 2025 01:17:32 +0300 Subject: [PATCH] Implement iterator over keys the implementation is not very optimized, but probably good enough for an MVP --- libs/neonart/src/algorithm.rs | 92 ++++++++++++++++++- libs/neonart/src/algorithm/node_ref.rs | 2 +- libs/neonart/src/lib.rs | 69 +++++++++++++- libs/neonart/src/tests.rs | 47 +++++++++- .../neon/communicator/src/integrated_cache.rs | 53 ++++++++++- 5 files changed, 249 insertions(+), 14 deletions(-) diff --git a/libs/neonart/src/algorithm.rs b/libs/neonart/src/algorithm.rs index 3c7831f439..6d73e6659d 100644 --- a/libs/neonart/src/algorithm.rs +++ b/libs/neonart/src/algorithm.rs @@ -50,7 +50,7 @@ pub(crate) fn search<'e, K: Key, V: Value>( key: &K, root: RootPtr, epoch_pin: &'e EpochPin, -) -> Option { +) -> Option<&'e V> { loop { let root_ref = NodeRef::from_root_ptr(root); if let Ok(result) = lookup_recurse(key.as_bytes(), root_ref, None, epoch_pin) { @@ -60,6 +60,29 @@ pub(crate) fn search<'e, K: Key, V: Value>( } } +pub(crate) fn iter_next<'e, V: Value>( + key: &[u8], + root: RootPtr, + epoch_pin: &'e EpochPin, +) -> Option<(Vec, &'e V)> { + loop { + let mut path = Vec::new(); + let root_ref = NodeRef::from_root_ptr(root); + + match next_recurse(key, &mut path, root_ref, epoch_pin) { + Ok(Some(v)) => { + assert_eq!(path.len(), key.len()); + break Some((path, v)) + }, + Ok(None) => break None, + Err(ConcurrentUpdateError()) => { + // retry + continue; + }, + } + } +} + pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator, F>( key: &K, value_fn: F, @@ -114,7 +137,7 @@ fn lookup_recurse<'e, V: Value>( node: NodeRef<'e, V>, parent: Option>, epoch_pin: &'e EpochPin, -) -> Result, ConcurrentUpdateError> { +) -> Result, ConcurrentUpdateError> { let rnode = node.read_lock_or_restart()?; if let Some(parent) = parent { parent.read_unlock_or_restart()?; @@ -135,14 +158,75 @@ fn lookup_recurse<'e, V: Value>( match next_node { None => Ok(None), // key not found Some(ChildOrValue::Value(vptr)) => { - // safety: It's OK to follow the pointer because we checked the version. - let v = unsafe { (*vptr).clone() }; + // safety: It's OK to return a ref of the pointer because we checked the version + // and the lifetime of 'epoch_pin' enforces that the reference is only accessible + // as long as the epoch is pinned. + let v = unsafe { vptr.as_ref().unwrap() }; Ok(Some(v)) } Some(ChildOrValue::Child(v)) => lookup_recurse(&key[1..], v, Some(rnode), epoch_pin), } } +fn next_recurse<'e, V: Value>( + min_key: &[u8], + path: &mut Vec, + node: NodeRef<'e, V>, + epoch_pin: &'e EpochPin, +) -> Result, ConcurrentUpdateError> { + let rnode = node.read_lock_or_restart()?; + let prefix = rnode.get_prefix(); + if prefix.len() != 0 { + path.extend_from_slice(prefix); + } + assert!(path.len() < min_key.len()); + + use std::cmp::Ordering; + let mut key_byte = match path.as_slice().cmp(&min_key[0..path.len()]) { + Ordering::Less => { + rnode.read_unlock_or_restart()?; + return Ok(None); + } + Ordering::Equal => min_key[path.len()], + Ordering::Greater => 0, + }; + loop { + // TODO: This iterates through all possible byte values. That's pretty unoptimal. + // Implement a function to scan the node for next key value efficiently. + match rnode.find_child_or_value_or_restart(key_byte)? { + None => { + if key_byte == u8::MAX { + return Ok(None); + } + key_byte += 1; + continue; + } + Some(ChildOrValue::Child(child_ref)) => { + let path_len = path.len(); + path.push(key_byte); + let result = next_recurse(min_key, path, child_ref, epoch_pin)?; + if result.is_some() { + return Ok(result); + } + if key_byte == u8::MAX { + return Ok(None); + } + path.truncate(path_len); + key_byte += 1; + } + Some(ChildOrValue::Value(vptr)) => { + path.push(key_byte); + assert_eq!(path.len(), min_key.len()); + // safety: It's OK to return a ref of the pointer because we checked the version + // and the lifetime of 'epoch_pin' enforces that the reference is only accessible + // as long as the epoch is pinned. + let v = unsafe { vptr.as_ref().unwrap() }; + return Ok(Some(v)) + } + } + } +} + // This corresponds to the 'insertOpt' function in the paper pub(crate) fn update_recurse<'e, 'g, K: Key, V: Value, A: ArtAllocator, F>( key: &[u8], diff --git a/libs/neonart/src/algorithm/node_ref.rs b/libs/neonart/src/algorithm/node_ref.rs index f1cd1cf749..dbc30c09e6 100644 --- a/libs/neonart/src/algorithm/node_ref.rs +++ b/libs/neonart/src/algorithm/node_ref.rs @@ -94,7 +94,7 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> { }))), } } - + pub(crate) fn upgrade_to_write_lock_or_restart( self, ) -> Result, ConcurrentUpdateError> { diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index 4b45145c65..5be80c8a81 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -325,7 +325,8 @@ where impl<'e, K: Key, V: Value> TreeReadGuard<'e, K, V> { pub fn get(&self, key: &K) -> Option { - algorithm::search(key, self.tree.root, &self.epoch_pin) + let vref = algorithm::search(key, self.tree.root, &self.epoch_pin); + vref.cloned() } } @@ -347,7 +348,8 @@ impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'t, K, V, A> { /// Get a value pub fn get(&mut self, key: &K) -> Option { - algorithm::search(key, self.tree_writer.tree.root, &self.epoch_pin) + let v = algorithm::search(key, self.tree_writer.tree.root, &self.epoch_pin); + v.cloned() } /// Insert a value @@ -404,6 +406,69 @@ impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'t, K, V, A> { } } +pub struct TreeIterator + where K: Key + for<'a> From<&'a [u8]>, +{ + done: bool, + next_key: Vec, + max_key: Option>, + + phantom_key: PhantomData, +} + +impl TreeIterator + where K: Key + for<'a> From<&'a [u8]>, +{ + pub fn new(range: &std::ops::Range) -> TreeIterator { + TreeIterator { + done: false, + next_key: Vec::from(range.start.as_bytes()), + max_key: Some(Vec::from(range.end.as_bytes())), + phantom_key: PhantomData, + } + } + + + pub fn next<'g, V>(&mut self, read_guard: TreeReadGuard<'g, K, V>) -> Option<(K, V)> + where V: Value + { + if self.done { + return None; + } + if let Some((k , v)) = algorithm::iter_next(&mut self.next_key, read_guard.tree.root, &read_guard.epoch_pin) { + assert_eq!(k.len(), self.next_key.len()); + if let Some(max_key) = &self.max_key { + assert_eq!(k.len(), max_key.len()); + if k.as_slice() >= max_key.as_slice() { + self.done = true; + return None; + } + } + // increment the key + self.next_key = k.clone(); + increment_key(self.next_key.as_mut_slice()); + let k = k.as_slice().into(); + + Some((k, v.clone())) + } else { + self.done = true; + None + } + } +} + +fn increment_key(key: &mut [u8]) -> bool { + for i in (0..key.len()).rev() { + let (byte, overflow) = key[i].overflowing_add(1); + key[i] = byte; + if !overflow { + return false; + } + } + true +} + + // Debugging functions impl<'t, K: Key, V: Value + Debug> TreeReadGuard<'t, K, V> { pub fn dump(&mut self) { diff --git a/libs/neonart/src/tests.rs b/libs/neonart/src/tests.rs index 3b315f456f..308001f8ce 100644 --- a/libs/neonart/src/tests.rs +++ b/libs/neonart/src/tests.rs @@ -4,6 +4,8 @@ use std::collections::BTreeMap; use crate::ArtAllocator; use crate::ArtMultiSlabAllocator; use crate::TreeInitStruct; +use crate::TreeWriteAccess; +use crate::TreeIterator; use crate::{Key, Value}; @@ -16,9 +18,13 @@ const TEST_KEY_LEN: usize = 16; #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] struct TestKey([u8; TEST_KEY_LEN]); +impl TestKey { + const MIN: TestKey = TestKey([0; TEST_KEY_LEN]); + const MAX: TestKey = TestKey([u8::MAX; TEST_KEY_LEN]); +} + impl Key for TestKey { const KEY_LEN: usize = TEST_KEY_LEN; - fn as_bytes(&self) -> &[u8] { &self.0 } @@ -30,6 +36,12 @@ impl From for TestKey { } } +impl<'a> From<&'a [u8]> for TestKey { + fn from(bytes: &'a [u8]) -> TestKey { + TestKey(bytes.try_into().unwrap()) + } +} + impl Value for usize {} fn test_inserts + Copy>(keys: &[K]) { @@ -96,7 +108,7 @@ fn sparse() { #[derive(Clone, Copy, Debug)] struct TestOp(TestKey, Option); -fn apply_op>(op: &TestOp, tree: &crate::TreeWriteAccess, shadow: &mut BTreeMap) { +fn apply_op>(op: &TestOp, tree: &TreeWriteAccess, shadow: &mut BTreeMap) { eprintln!("applying op: {op:?}"); // apply the change to the shadow tree first @@ -114,6 +126,31 @@ fn apply_op>(op: &TestOp, tree: &crate::TreeWriteAccess>(tree: &TreeWriteAccess, shadow: &BTreeMap) { + let mut shadow_iter = shadow.iter(); + let mut iter = TreeIterator::new(&(TestKey::MIN..TestKey::MAX)); + + loop { + let shadow_item = shadow_iter.next().map(|(k, v)| (k.clone(), v.clone())); + let item = iter.next(tree.start_read()); + + if shadow_item != item { + eprintln!("FAIL: iterator returned {:?}, expected {:?}", item, shadow_item); + tree.start_read().dump(); + + eprintln!("SHADOW:"); + let mut si = shadow.iter(); + while let Some(si) = si.next() { + eprintln!("key: {:?}, val: {}", si.0, si.1); + } + panic!("FAIL: iterator returned {:?}, expected {:?}", item, shadow_item); + } + if item.is_none() { + break; + } + } +} + #[test] fn random_ops() { const MEM_SIZE: usize = 10000000; @@ -141,5 +178,11 @@ fn random_ops() { ); apply_op(&op, &tree_writer, &mut shadow); + + if i % 1000 == 0 { + eprintln!("{i} ops processed"); + eprintln!("stats: {:?}", tree_writer.start_write().get_statistics()); + test_iter(&tree_writer, &shadow); + } } } diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 37cd65ec72..17d374d697 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -11,15 +11,29 @@ //! Note: This deals with "relations", which is really just one "relation fork" in Postgres //! terms. RelFileLocator + ForkNumber is the key. +// +// TODO: Thoughts on eviction: +// +// There are two things we need to track, and evict if we run out of space: +// - blocks in the file cache's file. If the file grows too large, need to evict something. +// Also if the cache is resized +// +// - entries in the cache tree. If we run out of memory in the shmem area, need to evict +// something +// + use std::mem::MaybeUninit; +use std::ops::Range; use utils::lsn::Lsn; +use zerocopy::FromBytes; use crate::file_cache::{CacheBlock, FileCache}; use pageserver_page_api::model::RelTag; use neonart; use neonart::TreeInitStruct; +use neonart::TreeIterator; const CACHE_AREA_SIZE: usize = 10 * 1024 * 1024; @@ -131,6 +145,7 @@ struct RelEntry { Ord, zerocopy_derive::IntoBytes, zerocopy_derive::Immutable, + zerocopy_derive::FromBytes, )] #[repr(packed)] struct TreeKey { @@ -141,6 +156,31 @@ struct TreeKey { block_number: u32, } +impl<'a> From<&'a [u8]> for TreeKey { + fn from(bytes: &'a [u8]) -> Self { + Self::read_from_bytes(bytes).expect("invalid key length") + } +} + +fn key_range_for_rel_blocks(rel: &RelTag) -> Range { + Range { + start: TreeKey { + spc_oid: rel.spc_oid, + db_oid: rel.db_oid, + rel_number: rel.rel_number, + fork_number: rel.fork_number, + block_number: 0, + }, + end: TreeKey { + spc_oid: rel.spc_oid, + db_oid: rel.db_oid, + rel_number: rel.rel_number, + fork_number: rel.fork_number, + block_number: u32::MAX, + }, + } +} + impl From<&RelTag> for TreeKey { fn from(val: &RelTag) -> TreeKey { TreeKey { @@ -322,12 +362,15 @@ impl<'t> IntegratedCacheWriteAccess<'t> { /// Forget information about given relation in the cache. (For DROP TABLE and such) pub fn forget_rel(&'t self, rel: &RelTag) { - // FIXME: not implemented properly. smgrexists() would still return true for this let w = self.cache_tree.start_write(); - w.insert( - &TreeKey::from(rel), - TreeEntry::Rel(RelEntry { nblocks: None }), - ); + w.remove(&TreeKey::from(rel)); + + // also forget all cached blocks for the relation + let mut iter = TreeIterator::new(&key_range_for_rel_blocks(rel)); + while let Some((k, _v)) = iter.next(self.cache_tree.start_read()) { + let w = self.cache_tree.start_write(); + w.remove(&k); + } } }