From e2bad5d9e93898ce7600fcfdc582b66526c1a9fc Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 12 May 2025 22:53:53 +0300 Subject: [PATCH] Add debugging HTTP endpoint for dumping the cache tree --- libs/neonart/src/algorithm.rs | 31 ++++--- libs/neonart/src/allocator/slab.rs | 16 +--- libs/neonart/src/lib.rs | 9 +- libs/neonart/src/tests.rs | 14 +++- .../neon/communicator/src/integrated_cache.rs | 84 +++++++++++-------- .../src/worker_process/main_loop.rs | 2 +- .../src/worker_process/metrics_exporter.rs | 14 ++++ 7 files changed, 105 insertions(+), 65 deletions(-) diff --git a/libs/neonart/src/algorithm.rs b/libs/neonart/src/algorithm.rs index 57cdc8db3c..1f0449929d 100644 --- a/libs/neonart/src/algorithm.rs +++ b/libs/neonart/src/algorithm.rs @@ -123,12 +123,6 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator, F>( } } -pub(crate) fn dump_tree<'e, V: Value + std::fmt::Debug>(root: RootPtr, epoch_pin: &'e EpochPin) { - let root_ref = NodeRef::from_root_ptr(root); - - let _ = dump_recurse(&[], root_ref, &epoch_pin, 0); -} - // Error means you must retry. // // This corresponds to the 'lookupOpt' function in the paper @@ -374,11 +368,23 @@ impl std::fmt::Debug for PathElement { } } +pub(crate) fn dump_tree<'e, V: Value + std::fmt::Debug>( + root: RootPtr, + epoch_pin: &'e EpochPin, + dst: &mut dyn std::io::Write, +) { + let root_ref = NodeRef::from_root_ptr(root); + + let _ = dump_recurse(&[], root_ref, &epoch_pin, 0, dst); +} + +// TODO: return an Err if writeln!() returns error, instead of unwrapping fn dump_recurse<'e, V: Value + std::fmt::Debug>( path: &[PathElement], node: NodeRef<'e, V>, epoch_pin: &'e EpochPin, level: usize, + dst: &mut dyn std::io::Write, ) -> Result<(), ConcurrentUpdateError> { let indent = str::repeat(" ", level); @@ -395,26 +401,29 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>( // and the lifetime of 'epoch_pin' enforces that the reference is only accessible // as long as the epoch is pinned. let val = unsafe { vptr.as_ref().unwrap() }; - eprintln!("{} {:?}: {:?}", indent, path, val); + writeln!(dst, "{} {:?}: {:?}", indent, path, val).unwrap(); + return Ok(()); } - for key_byte in 0..u8::MAX { + for key_byte in 0..=u8::MAX { match rnode.find_child_or_restart(key_byte)? { None => continue, Some(child_ref) => { let rchild = child_ref.read_lock_or_restart()?; - eprintln!( + writeln!( + dst, "{} {:?}, {}: prefix {:?}", indent, &path, key_byte, rchild.get_prefix() - ); + ) + .unwrap(); let mut child_path = path.clone(); child_path.push(PathElement::KeyByte(key_byte)); - dump_recurse(&child_path, child_ref, epoch_pin, level + 1)?; + dump_recurse(&child_path, child_ref, epoch_pin, level + 1, dst)?; } } } diff --git a/libs/neonart/src/allocator/slab.rs b/libs/neonart/src/allocator/slab.rs index dd17c35b44..29a3cf901a 100644 --- a/libs/neonart/src/allocator/slab.rs +++ b/libs/neonart/src/allocator/slab.rs @@ -186,15 +186,9 @@ impl SlabDesc { if !(*free_chunks_head).is_null() { let result = *free_chunks_head; (*free_chunks_head) = (*result).next; - let old = (*block_ptr).num_free_chunks.fetch_sub(1, Ordering::Relaxed); + let _old = (*block_ptr).num_free_chunks.fetch_sub(1, Ordering::Relaxed); self.num_allocated.fetch_add(1, Ordering::Relaxed); - - eprintln!( - "allocated chunk from block {:?}, {} chunks left", - block_ptr, - old - 1 - ); return result.cast(); } } @@ -226,8 +220,6 @@ impl SlabDesc { let mut block_lists_guard = self.block_lists.write(); block_lists_guard.nonfull_blocks.push_head(new_block); } - eprintln!("allocated new block {:?}", new_block); - self.num_allocated.fetch_add(1, Ordering::Relaxed); new_chunk } @@ -251,10 +243,6 @@ impl SlabDesc { num_free_chunks = (*block_ptr).num_free_chunks.fetch_add(1, Ordering::Relaxed) + 1; num_chunks = (*block_ptr).num_chunks; - eprintln!( - "deallocated chunk, block {:?} now has {} chunks left", - block_ptr, num_free_chunks - ); } if num_free_chunks == 1 { @@ -267,14 +255,12 @@ impl SlabDesc { block_lists.unlink(block_ptr); block_lists.nonfull_blocks.push_head(block_ptr); }; - eprintln!("block {:?} became non-full", block_ptr); } else if num_free_chunks == num_chunks { // If the block became completely empty, move it to the free list // TODO // FIXME: we're still holding the spinlock. It's not exactly safe to return it to // the free blocks list, is it? Defer it as garbage to wait out concurrent updates? //block_allocator.release_block() - eprintln!("block {:?} became empty", block_ptr); } // update stats diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index 01f1deb732..c0769b491b 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -548,9 +548,14 @@ fn increment_key(key: &mut [u8]) -> bool { } // Debugging functions +impl<'e, K: Key, V: Value + Debug, A: ArtAllocator> TreeWriteGuard<'e, K, V, A> { + pub fn dump(&mut self, dst: &mut dyn std::io::Write) { + algorithm::dump_tree(self.tree_writer.tree.root, &self.epoch_pin, dst) + } +} impl<'e, K: Key, V: Value + Debug> TreeReadGuard<'e, K, V> { - pub fn dump(&mut self) { - algorithm::dump_tree(self.tree.root, &self.epoch_pin) + pub fn dump(&mut self, dst: &mut dyn std::io::Write) { + algorithm::dump_tree(self.tree.root, &self.epoch_pin, dst) } } impl<'e, K: Key, V: Value> TreeWriteAccess<'e, K, V, ArtMultiSlabAllocator<'e, V>> { diff --git a/libs/neonart/src/tests.rs b/libs/neonart/src/tests.rs index 5e9b6e7f24..b9724d2760 100644 --- a/libs/neonart/src/tests.rs +++ b/libs/neonart/src/tests.rs @@ -33,6 +33,12 @@ impl Key for TestKey { } } +impl From<&TestKey> for u128 { + fn from(val: &TestKey) -> u128 { + u128::from_be_bytes(val.0) + } +} + impl From for TestKey { fn from(val: u128) -> TestKey { TestKey(val.to_be_bytes()) @@ -184,7 +190,7 @@ fn test_iter>( "FAIL: iterator returned {:?}, expected {:?}", item, shadow_item ); - tree.start_read().dump(); + tree.start_read().dump(&mut std::io::stderr()); eprintln!("SHADOW:"); let mut si = shadow.iter(); @@ -217,7 +223,11 @@ fn random_ops() { let distribution = Zipf::new(u128::MAX as f64, 1.1).unwrap(); let mut rng = rand::rng(); for i in 0..100000 { - let key: TestKey = (rng.sample(distribution) as u128).into(); + let mut key: TestKey = (rng.sample(distribution) as u128).into(); + + if rng.random_bool(0.10) { + key = TestKey::from(u128::from(&key) | 0xffffffff); + } let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None }); diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index cb56a7f6e7..8cf8b985ef 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -251,6 +251,24 @@ struct RelEntry { nblocks: AtomicU32, } +impl std::fmt::Debug for TreeEntry { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + match self { + TreeEntry::Rel(e) => fmt + .debug_struct("Rel") + .field("nblocks", &e.nblocks.load(Ordering::Relaxed)) + .finish(), + TreeEntry::Block(e) => fmt + .debug_struct("Block") + .field("lw_lsn", &e.lw_lsn.load()) + .field("cache_block", &e.cache_block.load(Ordering::Relaxed)) + .field("pinned", &e.pinned.load(Ordering::Relaxed)) + .field("referenced", &e.referenced.load(Ordering::Relaxed)) + .finish(), + } + } +} + #[derive( Clone, Debug, @@ -263,14 +281,15 @@ struct RelEntry { zerocopy_derive::FromBytes, )] #[repr(packed)] +// Note: the fields are stored in big-endian order, to make the radix tree more +// efficient, and to make scans over ranges of blocks work correctly. struct TreeKey { - spc_oid: u32, - db_oid: u32, - rel_number: u32, + spc_oid_be: u32, + db_oid_be: u32, + rel_number_be: u32, fork_number: u8, - block_number: u32, + block_number_be: u32, } - impl<'a> From<&'a [u8]> for TreeKey { fn from(bytes: &'a [u8]) -> Self { Self::read_from_bytes(bytes).expect("invalid key length") @@ -279,31 +298,19 @@ impl<'a> From<&'a [u8]> for TreeKey { fn key_range_for_rel_blocks(rel: &RelTag) -> Range { Range { - start: TreeKey { - spc_oid: rel.spc_oid, - db_oid: rel.db_oid, - rel_number: rel.rel_number, - fork_number: rel.fork_number, - block_number: 0, - }, - end: TreeKey { - spc_oid: rel.spc_oid, - db_oid: rel.db_oid, - rel_number: rel.rel_number, - fork_number: rel.fork_number, - block_number: u32::MAX, - }, + start: TreeKey::from((rel, 0)), + end: TreeKey::from((rel, u32::MAX)), } } impl From<&RelTag> for TreeKey { fn from(val: &RelTag) -> TreeKey { TreeKey { - spc_oid: val.spc_oid, - db_oid: val.db_oid, - rel_number: val.rel_number, - fork_number: val.fork_number, - block_number: u32::MAX, + spc_oid_be: val.spc_oid.to_be(), + db_oid_be: val.db_oid.to_be(), + rel_number_be: val.rel_number.to_be(), + fork_number: val.fork_number.to_be(), + block_number_be: u32::MAX.to_be(), } } } @@ -311,11 +318,11 @@ impl From<&RelTag> for TreeKey { impl From<(&RelTag, u32)> for TreeKey { fn from(val: (&RelTag, u32)) -> TreeKey { TreeKey { - spc_oid: val.0.spc_oid, - db_oid: val.0.db_oid, - rel_number: val.0.rel_number, - fork_number: val.0.fork_number, - block_number: val.1, + spc_oid_be: val.0.spc_oid.to_be(), + db_oid_be: val.0.db_oid.to_be(), + rel_number_be: val.0.rel_number.to_be(), + fork_number: val.0.fork_number.to_be(), + block_number_be: val.1.to_be(), } } } @@ -442,12 +449,16 @@ impl<'t> IntegratedCacheWriteAccess<'t> { pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { let w = self.cache_tree.start_write(); w.update_with_fn(&TreeKey::from(rel), |existing| match existing { - None => UpdateAction::Insert(TreeEntry::Rel(RelEntry { - nblocks: AtomicU32::new(nblocks), - })), + None => { + tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); + UpdateAction::Insert(TreeEntry::Rel(RelEntry { + nblocks: AtomicU32::new(nblocks), + })) + } Some(TreeEntry::Block(_)) => panic!("unexpected tree entry type for rel key"), - Some(TreeEntry::Rel(rel)) => { - rel.nblocks.store(nblocks, Ordering::Relaxed); + Some(TreeEntry::Rel(e)) => { + tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks"); + e.nblocks.store(nblocks, Ordering::Relaxed); UpdateAction::Nothing } }); @@ -620,6 +631,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { /// Forget information about given relation in the cache. (For DROP TABLE and such) pub fn forget_rel(&'t self, rel: &RelTag) { + tracing::info!("forgetting rel entry for {rel:?}"); let w = self.cache_tree.start_write(); w.remove(&TreeKey::from(rel)); @@ -726,6 +738,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // Give up if we didn't find anything None } + + pub fn dump_tree(&self, dst: &mut dyn std::io::Write) { + self.cache_tree.start_read().dump(dst); + } } impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 59edbdb831..8c3498ab7e 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -29,7 +29,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> { pageserver_client: PageserverClient, - cache: IntegratedCacheWriteAccess<'a>, + pub(crate) cache: IntegratedCacheWriteAccess<'a>, submission_pipe_read_raw_fd: i32, diff --git a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs index f4de0c0f2d..160b37fa37 100644 --- a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs +++ b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs @@ -19,6 +19,7 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> { use axum::routing::get; let app = Router::new() .route("/metrics", get(get_metrics)) + .route("/dump_cache_tree", get(dump_cache_tree)) .with_state(self); // TODO: make configurable. Or listen on unix domain socket? @@ -33,6 +34,19 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> { } } +async fn dump_cache_tree( + State(state): State<&CommunicatorWorkerProcessStruct<'static>>, +) -> Response { + let mut buf: Vec = Vec::new(); + state.cache.dump_tree(&mut buf); + + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/text") + .body(Body::from(buf)) + .unwrap() +} + /// Expose Prometheus metrics. async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct<'static>>) -> Response { use metrics::core::Collector;