mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
Add debugging HTTP endpoint for dumping the cache tree
This commit is contained in:
@@ -123,12 +123,6 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator<V>, F>(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn dump_tree<'e, V: Value + std::fmt::Debug>(root: RootPtr<V>, epoch_pin: &'e EpochPin) {
|
||||
let root_ref = NodeRef::from_root_ptr(root);
|
||||
|
||||
let _ = dump_recurse(&[], root_ref, &epoch_pin, 0);
|
||||
}
|
||||
|
||||
// Error means you must retry.
|
||||
//
|
||||
// This corresponds to the 'lookupOpt' function in the paper
|
||||
@@ -374,11 +368,23 @@ impl std::fmt::Debug for PathElement {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn dump_tree<'e, V: Value + std::fmt::Debug>(
|
||||
root: RootPtr<V>,
|
||||
epoch_pin: &'e EpochPin,
|
||||
dst: &mut dyn std::io::Write,
|
||||
) {
|
||||
let root_ref = NodeRef::from_root_ptr(root);
|
||||
|
||||
let _ = dump_recurse(&[], root_ref, &epoch_pin, 0, dst);
|
||||
}
|
||||
|
||||
// TODO: return an Err if writeln!() returns error, instead of unwrapping
|
||||
fn dump_recurse<'e, V: Value + std::fmt::Debug>(
|
||||
path: &[PathElement],
|
||||
node: NodeRef<'e, V>,
|
||||
epoch_pin: &'e EpochPin,
|
||||
level: usize,
|
||||
dst: &mut dyn std::io::Write,
|
||||
) -> Result<(), ConcurrentUpdateError> {
|
||||
let indent = str::repeat(" ", level);
|
||||
|
||||
@@ -395,26 +401,29 @@ fn dump_recurse<'e, V: Value + std::fmt::Debug>(
|
||||
// and the lifetime of 'epoch_pin' enforces that the reference is only accessible
|
||||
// as long as the epoch is pinned.
|
||||
let val = unsafe { vptr.as_ref().unwrap() };
|
||||
eprintln!("{} {:?}: {:?}", indent, path, val);
|
||||
writeln!(dst, "{} {:?}: {:?}", indent, path, val).unwrap();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for key_byte in 0..u8::MAX {
|
||||
for key_byte in 0..=u8::MAX {
|
||||
match rnode.find_child_or_restart(key_byte)? {
|
||||
None => continue,
|
||||
Some(child_ref) => {
|
||||
let rchild = child_ref.read_lock_or_restart()?;
|
||||
eprintln!(
|
||||
writeln!(
|
||||
dst,
|
||||
"{} {:?}, {}: prefix {:?}",
|
||||
indent,
|
||||
&path,
|
||||
key_byte,
|
||||
rchild.get_prefix()
|
||||
);
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut child_path = path.clone();
|
||||
child_path.push(PathElement::KeyByte(key_byte));
|
||||
|
||||
dump_recurse(&child_path, child_ref, epoch_pin, level + 1)?;
|
||||
dump_recurse(&child_path, child_ref, epoch_pin, level + 1, dst)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,15 +186,9 @@ impl SlabDesc {
|
||||
if !(*free_chunks_head).is_null() {
|
||||
let result = *free_chunks_head;
|
||||
(*free_chunks_head) = (*result).next;
|
||||
let old = (*block_ptr).num_free_chunks.fetch_sub(1, Ordering::Relaxed);
|
||||
let _old = (*block_ptr).num_free_chunks.fetch_sub(1, Ordering::Relaxed);
|
||||
|
||||
self.num_allocated.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
eprintln!(
|
||||
"allocated chunk from block {:?}, {} chunks left",
|
||||
block_ptr,
|
||||
old - 1
|
||||
);
|
||||
return result.cast();
|
||||
}
|
||||
}
|
||||
@@ -226,8 +220,6 @@ impl SlabDesc {
|
||||
let mut block_lists_guard = self.block_lists.write();
|
||||
block_lists_guard.nonfull_blocks.push_head(new_block);
|
||||
}
|
||||
eprintln!("allocated new block {:?}", new_block);
|
||||
|
||||
self.num_allocated.fetch_add(1, Ordering::Relaxed);
|
||||
new_chunk
|
||||
}
|
||||
@@ -251,10 +243,6 @@ impl SlabDesc {
|
||||
|
||||
num_free_chunks = (*block_ptr).num_free_chunks.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
num_chunks = (*block_ptr).num_chunks;
|
||||
eprintln!(
|
||||
"deallocated chunk, block {:?} now has {} chunks left",
|
||||
block_ptr, num_free_chunks
|
||||
);
|
||||
}
|
||||
|
||||
if num_free_chunks == 1 {
|
||||
@@ -267,14 +255,12 @@ impl SlabDesc {
|
||||
block_lists.unlink(block_ptr);
|
||||
block_lists.nonfull_blocks.push_head(block_ptr);
|
||||
};
|
||||
eprintln!("block {:?} became non-full", block_ptr);
|
||||
} else if num_free_chunks == num_chunks {
|
||||
// If the block became completely empty, move it to the free list
|
||||
// TODO
|
||||
// FIXME: we're still holding the spinlock. It's not exactly safe to return it to
|
||||
// the free blocks list, is it? Defer it as garbage to wait out concurrent updates?
|
||||
//block_allocator.release_block()
|
||||
eprintln!("block {:?} became empty", block_ptr);
|
||||
}
|
||||
|
||||
// update stats
|
||||
|
||||
@@ -548,9 +548,14 @@ fn increment_key(key: &mut [u8]) -> bool {
|
||||
}
|
||||
|
||||
// Debugging functions
|
||||
impl<'e, K: Key, V: Value + Debug, A: ArtAllocator<V>> TreeWriteGuard<'e, K, V, A> {
|
||||
pub fn dump(&mut self, dst: &mut dyn std::io::Write) {
|
||||
algorithm::dump_tree(self.tree_writer.tree.root, &self.epoch_pin, dst)
|
||||
}
|
||||
}
|
||||
impl<'e, K: Key, V: Value + Debug> TreeReadGuard<'e, K, V> {
|
||||
pub fn dump(&mut self) {
|
||||
algorithm::dump_tree(self.tree.root, &self.epoch_pin)
|
||||
pub fn dump(&mut self, dst: &mut dyn std::io::Write) {
|
||||
algorithm::dump_tree(self.tree.root, &self.epoch_pin, dst)
|
||||
}
|
||||
}
|
||||
impl<'e, K: Key, V: Value> TreeWriteAccess<'e, K, V, ArtMultiSlabAllocator<'e, V>> {
|
||||
|
||||
@@ -33,6 +33,12 @@ impl Key for TestKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&TestKey> for u128 {
|
||||
fn from(val: &TestKey) -> u128 {
|
||||
u128::from_be_bytes(val.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<u128> for TestKey {
|
||||
fn from(val: u128) -> TestKey {
|
||||
TestKey(val.to_be_bytes())
|
||||
@@ -184,7 +190,7 @@ fn test_iter<A: ArtAllocator<TestValue>>(
|
||||
"FAIL: iterator returned {:?}, expected {:?}",
|
||||
item, shadow_item
|
||||
);
|
||||
tree.start_read().dump();
|
||||
tree.start_read().dump(&mut std::io::stderr());
|
||||
|
||||
eprintln!("SHADOW:");
|
||||
let mut si = shadow.iter();
|
||||
@@ -217,7 +223,11 @@ fn random_ops() {
|
||||
let distribution = Zipf::new(u128::MAX as f64, 1.1).unwrap();
|
||||
let mut rng = rand::rng();
|
||||
for i in 0..100000 {
|
||||
let key: TestKey = (rng.sample(distribution) as u128).into();
|
||||
let mut key: TestKey = (rng.sample(distribution) as u128).into();
|
||||
|
||||
if rng.random_bool(0.10) {
|
||||
key = TestKey::from(u128::from(&key) | 0xffffffff);
|
||||
}
|
||||
|
||||
let op = TestOp(key, if rng.random_bool(0.75) { Some(i) } else { None });
|
||||
|
||||
|
||||
@@ -251,6 +251,24 @@ struct RelEntry {
|
||||
nblocks: AtomicU32,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for TreeEntry {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
TreeEntry::Rel(e) => fmt
|
||||
.debug_struct("Rel")
|
||||
.field("nblocks", &e.nblocks.load(Ordering::Relaxed))
|
||||
.finish(),
|
||||
TreeEntry::Block(e) => fmt
|
||||
.debug_struct("Block")
|
||||
.field("lw_lsn", &e.lw_lsn.load())
|
||||
.field("cache_block", &e.cache_block.load(Ordering::Relaxed))
|
||||
.field("pinned", &e.pinned.load(Ordering::Relaxed))
|
||||
.field("referenced", &e.referenced.load(Ordering::Relaxed))
|
||||
.finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
@@ -263,14 +281,15 @@ struct RelEntry {
|
||||
zerocopy_derive::FromBytes,
|
||||
)]
|
||||
#[repr(packed)]
|
||||
// Note: the fields are stored in big-endian order, to make the radix tree more
|
||||
// efficient, and to make scans over ranges of blocks work correctly.
|
||||
struct TreeKey {
|
||||
spc_oid: u32,
|
||||
db_oid: u32,
|
||||
rel_number: u32,
|
||||
spc_oid_be: u32,
|
||||
db_oid_be: u32,
|
||||
rel_number_be: u32,
|
||||
fork_number: u8,
|
||||
block_number: u32,
|
||||
block_number_be: u32,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a [u8]> for TreeKey {
|
||||
fn from(bytes: &'a [u8]) -> Self {
|
||||
Self::read_from_bytes(bytes).expect("invalid key length")
|
||||
@@ -279,31 +298,19 @@ impl<'a> From<&'a [u8]> for TreeKey {
|
||||
|
||||
fn key_range_for_rel_blocks(rel: &RelTag) -> Range<TreeKey> {
|
||||
Range {
|
||||
start: TreeKey {
|
||||
spc_oid: rel.spc_oid,
|
||||
db_oid: rel.db_oid,
|
||||
rel_number: rel.rel_number,
|
||||
fork_number: rel.fork_number,
|
||||
block_number: 0,
|
||||
},
|
||||
end: TreeKey {
|
||||
spc_oid: rel.spc_oid,
|
||||
db_oid: rel.db_oid,
|
||||
rel_number: rel.rel_number,
|
||||
fork_number: rel.fork_number,
|
||||
block_number: u32::MAX,
|
||||
},
|
||||
start: TreeKey::from((rel, 0)),
|
||||
end: TreeKey::from((rel, u32::MAX)),
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&RelTag> for TreeKey {
|
||||
fn from(val: &RelTag) -> TreeKey {
|
||||
TreeKey {
|
||||
spc_oid: val.spc_oid,
|
||||
db_oid: val.db_oid,
|
||||
rel_number: val.rel_number,
|
||||
fork_number: val.fork_number,
|
||||
block_number: u32::MAX,
|
||||
spc_oid_be: val.spc_oid.to_be(),
|
||||
db_oid_be: val.db_oid.to_be(),
|
||||
rel_number_be: val.rel_number.to_be(),
|
||||
fork_number: val.fork_number.to_be(),
|
||||
block_number_be: u32::MAX.to_be(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -311,11 +318,11 @@ impl From<&RelTag> for TreeKey {
|
||||
impl From<(&RelTag, u32)> for TreeKey {
|
||||
fn from(val: (&RelTag, u32)) -> TreeKey {
|
||||
TreeKey {
|
||||
spc_oid: val.0.spc_oid,
|
||||
db_oid: val.0.db_oid,
|
||||
rel_number: val.0.rel_number,
|
||||
fork_number: val.0.fork_number,
|
||||
block_number: val.1,
|
||||
spc_oid_be: val.0.spc_oid.to_be(),
|
||||
db_oid_be: val.0.db_oid.to_be(),
|
||||
rel_number_be: val.0.rel_number.to_be(),
|
||||
fork_number: val.0.fork_number.to_be(),
|
||||
block_number_be: val.1.to_be(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -442,12 +449,16 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) {
|
||||
let w = self.cache_tree.start_write();
|
||||
w.update_with_fn(&TreeKey::from(rel), |existing| match existing {
|
||||
None => UpdateAction::Insert(TreeEntry::Rel(RelEntry {
|
||||
nblocks: AtomicU32::new(nblocks),
|
||||
})),
|
||||
None => {
|
||||
tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks");
|
||||
UpdateAction::Insert(TreeEntry::Rel(RelEntry {
|
||||
nblocks: AtomicU32::new(nblocks),
|
||||
}))
|
||||
}
|
||||
Some(TreeEntry::Block(_)) => panic!("unexpected tree entry type for rel key"),
|
||||
Some(TreeEntry::Rel(rel)) => {
|
||||
rel.nblocks.store(nblocks, Ordering::Relaxed);
|
||||
Some(TreeEntry::Rel(e)) => {
|
||||
tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks");
|
||||
e.nblocks.store(nblocks, Ordering::Relaxed);
|
||||
UpdateAction::Nothing
|
||||
}
|
||||
});
|
||||
@@ -620,6 +631,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
|
||||
/// Forget information about given relation in the cache. (For DROP TABLE and such)
|
||||
pub fn forget_rel(&'t self, rel: &RelTag) {
|
||||
tracing::info!("forgetting rel entry for {rel:?}");
|
||||
let w = self.cache_tree.start_write();
|
||||
w.remove(&TreeKey::from(rel));
|
||||
|
||||
@@ -726,6 +738,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
// Give up if we didn't find anything
|
||||
None
|
||||
}
|
||||
|
||||
pub fn dump_tree(&self, dst: &mut dyn std::io::Write) {
|
||||
self.cache_tree.start_read().dump(dst);
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
|
||||
|
||||
@@ -29,7 +29,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
|
||||
|
||||
pageserver_client: PageserverClient,
|
||||
|
||||
cache: IntegratedCacheWriteAccess<'a>,
|
||||
pub(crate) cache: IntegratedCacheWriteAccess<'a>,
|
||||
|
||||
submission_pipe_read_raw_fd: i32,
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> {
|
||||
use axum::routing::get;
|
||||
let app = Router::new()
|
||||
.route("/metrics", get(get_metrics))
|
||||
.route("/dump_cache_tree", get(dump_cache_tree))
|
||||
.with_state(self);
|
||||
|
||||
// TODO: make configurable. Or listen on unix domain socket?
|
||||
@@ -33,6 +34,19 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn dump_cache_tree(
|
||||
State(state): State<&CommunicatorWorkerProcessStruct<'static>>,
|
||||
) -> Response {
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
state.cache.dump_tree(&mut buf);
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, "application/text")
|
||||
.body(Body::from(buf))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Expose Prometheus metrics.
|
||||
async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct<'static>>) -> Response {
|
||||
use metrics::core::Collector;
|
||||
|
||||
Reference in New Issue
Block a user