diff --git a/Cargo.lock b/Cargo.lock index c4122a142e..a9b54ffcaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1346,14 +1346,17 @@ name = "communicator" version = "0.1.0" dependencies = [ "atomic_enum", + "axum 0.8.1", "bytes", "cbindgen", "http 1.1.0", "libc", + "metrics", "neonart", "nix 0.27.1", "pageserver_client_grpc", "pageserver_page_api", + "prometheus", "prost 0.13.3", "thiserror 1.0.69", "tokio", diff --git a/libs/neonart/src/algorithm.rs b/libs/neonart/src/algorithm.rs index 6d73e6659d..ae05970ca4 100644 --- a/libs/neonart/src/algorithm.rs +++ b/libs/neonart/src/algorithm.rs @@ -182,7 +182,7 @@ fn next_recurse<'e, V: Value>( assert!(path.len() < min_key.len()); use std::cmp::Ordering; - let mut key_byte = match path.as_slice().cmp(&min_key[0..path.len()]) { + let mut min_key_byte = match path.as_slice().cmp(&min_key[0..path.len()]) { Ordering::Less => { rnode.read_unlock_or_restart()?; return Ok(None); @@ -191,17 +191,11 @@ fn next_recurse<'e, V: Value>( Ordering::Greater => 0, }; loop { - // TODO: This iterates through all possible byte values. That's pretty unoptimal. - // Implement a function to scan the node for next key value efficiently. - match rnode.find_child_or_value_or_restart(key_byte)? { + match rnode.find_next_child_or_value_or_restart(min_key_byte)? { None => { - if key_byte == u8::MAX { - return Ok(None); - } - key_byte += 1; - continue; - } - Some(ChildOrValue::Child(child_ref)) => { + return Ok(None); + }, + Some((key_byte, ChildOrValue::Child(child_ref))) => { let path_len = path.len(); path.push(key_byte); let result = next_recurse(min_key, path, child_ref, epoch_pin)?; @@ -212,9 +206,9 @@ fn next_recurse<'e, V: Value>( return Ok(None); } path.truncate(path_len); - key_byte += 1; - } - Some(ChildOrValue::Value(vptr)) => { + min_key_byte = key_byte + 1; + }, + Some((key_byte, ChildOrValue::Value(vptr))) => { path.push(key_byte); assert_eq!(path.len(), min_key.len()); // safety: It's OK to return a ref of the pointer because we checked the version @@ -222,7 +216,7 @@ fn next_recurse<'e, V: Value>( // as long as the epoch is pinned. let v = unsafe { vptr.as_ref().unwrap() }; return Ok(Some(v)) - } + }, } } } diff --git a/libs/neonart/src/algorithm/node_ptr.rs b/libs/neonart/src/algorithm/node_ptr.rs index d1a043b550..be1f36e560 100644 --- a/libs/neonart/src/algorithm/node_ptr.rs +++ b/libs/neonart/src/algorithm/node_ptr.rs @@ -100,8 +100,6 @@ pub struct NodeInternal16 { child_ptrs: [NodePtr; 16], } -const INVALID_CHILD_INDEX: u8 = u8::MAX; - #[repr(C)] pub struct NodeInternal48 { tag: NodeTag, @@ -114,6 +112,7 @@ pub struct NodeInternal48 { child_indexes: [u8; 256], child_ptrs: [NodePtr; 48], } +const INVALID_CHILD_INDEX: u8 = u8::MAX; #[repr(C)] pub struct NodeInternal256 { @@ -339,6 +338,35 @@ impl NodePtr { } } + pub(crate) fn find_next_child_or_value(&self, key_byte: u8) -> Option<(u8, ChildOrValuePtr)> { + match self.variant() { + NodeVariant::Internal4(n) => n + .find_next_child(key_byte) + .map(|(k, c)| (k, ChildOrValuePtr::Child(c))), + NodeVariant::Internal16(n) => n + .find_next_child(key_byte) + .map(|(k, c)| (k, ChildOrValuePtr::Child(c))), + NodeVariant::Internal48(n) => n + .find_next_child(key_byte) + .map(|(k, c)| (k, ChildOrValuePtr::Child(c))), + NodeVariant::Internal256(n) => n + .find_next_child(key_byte) + .map(|(k, c)| (k, ChildOrValuePtr::Child(c))), + NodeVariant::Leaf4(n) => n + .find_next_leaf_value(key_byte) + .map(|(k, v)| (k, ChildOrValuePtr::Value(v))), + NodeVariant::Leaf16(n) => n + .find_next_leaf_value(key_byte) + .map(|(k, v)| (k, ChildOrValuePtr::Value(v))), + NodeVariant::Leaf48(n) => n + .find_next_leaf_value(key_byte) + .map(|(k, v)| (k, ChildOrValuePtr::Value(v))), + NodeVariant::Leaf256(n) => n + .find_next_leaf_value(key_byte) + .map(|(k, v)| (k, ChildOrValuePtr::Value(v))), + } + } + pub(crate) fn truncate_prefix(&mut self, new_prefix_len: usize) { match self.variant_mut() { NodeVariantMut::Internal4(n) => n.truncate_prefix(new_prefix_len), @@ -512,6 +540,27 @@ impl NodeInternal4 { None } + fn find_next_child(&self, min_key: u8) -> Option<(u8, NodePtr)> { + let mut found: Option<(usize, u8)> = None; + for i in 0..self.num_children as usize { + let this_key = self.child_keys[i]; + if this_key >= min_key { + if let Some((_, found_key)) = found { + if this_key < found_key { + found = Some((i, this_key)); + } + } else { + found = Some((i, this_key)); + } + } + } + if let Some((found_idx, found_key)) = found { + Some((found_key, self.child_ptrs[found_idx])) + } else { + None + } + } + fn replace_child(&mut self, key_byte: u8, replacement: NodePtr) { for i in 0..self.num_children as usize { if self.child_keys[i] == key_byte { @@ -584,6 +633,27 @@ impl NodeInternal16 { None } + fn find_next_child(&self, min_key: u8) -> Option<(u8, NodePtr)> { + let mut found: Option<(usize, u8)> = None; + for i in 0..self.num_children as usize { + let this_key = self.child_keys[i]; + if this_key >= min_key { + if let Some((_, found_key)) = found { + if this_key < found_key { + found = Some((i, this_key)); + } + } else { + found = Some((i, this_key)); + } + } + } + if let Some((found_idx, found_key)) = found { + Some((found_key, self.child_ptrs[found_idx])) + } else { + None + } + } + fn replace_child(&mut self, key_byte: u8, replacement: NodePtr) { for i in 0..self.num_children as usize { if self.child_keys[i] == key_byte { @@ -657,6 +727,16 @@ impl NodeInternal48 { } } + fn find_next_child(&self, min_key: u8) -> Option<(u8, NodePtr)> { + for key in min_key..=u8::MAX { + let idx = self.child_indexes[key as usize]; + if idx != INVALID_CHILD_INDEX { + return Some((key, self.child_ptrs[idx as usize])); + } + } + None + } + fn replace_child(&mut self, key_byte: u8, replacement: NodePtr) { let idx = self.child_indexes[key_byte as usize]; if idx != INVALID_CHILD_INDEX { @@ -729,6 +809,15 @@ impl NodeInternal256 { } } + fn find_next_child(&self, min_key: u8) -> Option<(u8, NodePtr)> { + for key in min_key..=u8::MAX { + if !self.child_ptrs[key as usize].is_null() { + return Some((key, self.child_ptrs[key as usize])); + } + } + None + } + fn replace_child(&mut self, key_byte: u8, replacement: NodePtr) { let idx = key_byte as usize; if !self.child_ptrs[idx].is_null() { @@ -774,6 +863,28 @@ impl NodeLeaf4 { } None } + + fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> { + let mut found: Option<(usize, u8)> = None; + for i in 0..self.num_values as usize { + let this_key = self.child_keys[i]; + if this_key >= min_key { + if let Some((_, found_key)) = found { + if this_key < found_key { + found = Some((i, this_key)); + } + } else { + found = Some((i, this_key)); + } + } + } + if let Some((found_idx, found_key)) = found { + Some((found_key, self.child_values[found_idx].as_ref().unwrap())) + } else { + None + } + } + fn is_full(&self) -> bool { self.num_values == 4 } @@ -853,6 +964,28 @@ impl NodeLeaf16 { } None } + + fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> { + let mut found: Option<(usize, u8)> = None; + for i in 0..self.num_values as usize { + let this_key = self.child_keys[i]; + if this_key >= min_key { + if let Some((_, found_key)) = found { + if this_key < found_key { + found = Some((i, this_key)); + } + } else { + found = Some((i, this_key)); + } + } + } + if let Some((found_idx, found_key)) = found { + Some((found_key, self.child_values[found_idx].as_ref().unwrap())) + } else { + None + } + } + fn is_full(&self) -> bool { self.num_values == 16 } @@ -932,6 +1065,17 @@ impl NodeLeaf48 { None } } + + fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> { + for key in min_key..=u8::MAX { + let idx = self.child_indexes[key as usize]; + if idx != INVALID_CHILD_INDEX { + return Some((key, &self.child_values[idx as usize].as_ref().unwrap())); + } + } + None + } + fn is_full(&self) -> bool { self.num_values == 48 } @@ -1017,6 +1161,16 @@ impl NodeLeaf256 { let idx = key as usize; self.child_values[idx].as_ref() } + + fn find_next_leaf_value<'a: 'b, 'b>(&'a self, min_key: u8) -> Option<(u8, &'b V)> { + for key in min_key..=u8::MAX { + if let Some(v) = &self.child_values[key as usize] { + return Some((key, v)); + } + } + None + } + fn is_full(&self) -> bool { self.num_values == 256 } diff --git a/libs/neonart/src/algorithm/node_ref.rs b/libs/neonart/src/algorithm/node_ref.rs index dbc30c09e6..c896b4b147 100644 --- a/libs/neonart/src/algorithm/node_ref.rs +++ b/libs/neonart/src/algorithm/node_ref.rs @@ -94,6 +94,23 @@ impl<'e, V: Value> ReadLockedNodeRef<'e, V> { }))), } } + + pub(crate) fn find_next_child_or_value_or_restart( + &self, + min_key_byte: u8, + ) -> Result)>, ConcurrentUpdateError> { + let child_or_value = self.ptr.find_next_child_or_value(min_key_byte); + self.ptr.lockword().check_or_restart(self.version)?; + + match child_or_value { + None => Ok(None), + Some((k, ChildOrValuePtr::Value(vptr)) )=> Ok(Some((k, ChildOrValue::Value(vptr)))), + Some((k, ChildOrValuePtr::Child(child_ptr))) => Ok(Some((k, ChildOrValue::Child(NodeRef { + ptr: child_ptr, + phantom: self.phantom, + })))), + } + } pub(crate) fn upgrade_to_write_lock_or_restart( self, diff --git a/libs/neonart/src/allocator/block.rs b/libs/neonart/src/allocator/block.rs index a8e02ef4b8..4e7e37adef 100644 --- a/libs/neonart/src/allocator/block.rs +++ b/libs/neonart/src/allocator/block.rs @@ -116,6 +116,8 @@ impl<'t> BlockAllocator<'t> { return INVALID_BLOCK; } + // TODO: this is currently unused. The slab allocator never releases blocks + #[allow(dead_code)] pub(crate) fn release_block(&self, block_ptr: *mut u8) { let blockno = unsafe { block_ptr.byte_offset_from(self.blocks_ptr) / BLOCK_SIZE as isize }; self.release_block_internal(blockno as u64); diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index 9b6a8389bd..9eafb8c7e6 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -324,9 +324,8 @@ where } impl<'e, K: Key, V: Value> TreeReadGuard<'e, K, V> { - pub fn get(&self, key: &K) -> Option { - let vref = algorithm::search(key, self.tree.root, &self.epoch_pin); - vref.cloned() + pub fn get(&'e self, key: &K) -> Option<&'e V> { + algorithm::search(key, self.tree.root, &self.epoch_pin) } } @@ -347,9 +346,8 @@ where impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'t, K, V, A> { /// Get a value - pub fn get(&mut self, key: &K) -> Option { - let v = algorithm::search(key, self.tree_writer.tree.root, &self.epoch_pin); - v.cloned() + pub fn get(&'t mut self, key: &K) -> Option<&'t V> { + algorithm::search(key, self.tree_writer.tree.root, &self.epoch_pin) } /// Insert a value @@ -377,13 +375,11 @@ impl<'t, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'t, K, V, A> { where F: FnOnce(Option<&V>) -> Option, { - let result = algorithm::update_fn(key, value_fn, self.tree_writer.tree.root, &mut self); + algorithm::update_fn(key, value_fn, self.tree_writer.tree.root, &mut self); if self.created_garbage { - let n = self.collect_garbage(); - eprintln!("collected {n} obsolete nodes"); + let _ = self.collect_garbage(); } - result } fn remember_obsolete_node(&mut self, ptr: NodePtr) { @@ -415,7 +411,7 @@ pub struct TreeIterator where K: Key + for<'a> From<&'a [u8]>, { done: bool, - next_key: Vec, + pub next_key: Vec, max_key: Option>, phantom_key: PhantomData, @@ -436,12 +432,16 @@ impl TreeIterator } pub fn new(range: &std::ops::Range) -> TreeIterator { - TreeIterator { + let result = TreeIterator { done: false, next_key: Vec::from(range.start.as_bytes()), max_key: Some(Vec::from(range.end.as_bytes())), phantom_key: PhantomData, - } + }; + assert_eq!(result.next_key.len(), K::KEY_LEN); + assert_eq!(result.max_key.as_ref().unwrap().len(), K::KEY_LEN); + + result } @@ -451,27 +451,48 @@ impl TreeIterator if self.done { return None; } - if let Some((k , v)) = algorithm::iter_next(&mut self.next_key, read_guard.tree.root, &read_guard.epoch_pin) { - assert_eq!(k.len(), self.next_key.len()); - // Check if we reached the end of the range - if let Some(max_key) = &self.max_key { - assert_eq!(k.len(), max_key.len()); - if k.as_slice() >= max_key.as_slice() { - self.done = true; - return None; + let mut wrapped_around = false; + loop { + assert_eq!(self.next_key.len(), K::KEY_LEN); + if let Some((k , v)) = algorithm::iter_next(&mut self.next_key, read_guard.tree.root, &read_guard.epoch_pin) { + assert_eq!(k.len(), K::KEY_LEN); + assert_eq!(self.next_key.len(), K::KEY_LEN); + + // Check if we reached the end of the range + if let Some(max_key) = &self.max_key { + if k.as_slice() >= max_key.as_slice() { + self.done = true; + break None; + } } + + // increment the key + self.next_key = k.clone(); + increment_key(self.next_key.as_mut_slice()); + let k = k.as_slice().into(); + + break Some((k, v)) + } else { + if self.max_key.is_some() { + self.done = true; + } else { + // Start from beginning + if !wrapped_around { + for i in 0..K::KEY_LEN { + self.next_key[i] = 0; + } + wrapped_around = true; + continue; + } else { + // The tree is completely empty + // FIXME: perhaps we should remember the starting point instead. + // Currently this will scan some ranges twice. + break None; + } + } + break None } - - // increment the key - self.next_key = k.clone(); - increment_key(self.next_key.as_mut_slice()); - let k = k.as_slice().into(); - - Some((k, v)) - } else { - self.done = true; - None } } } diff --git a/libs/neonart/src/tests.rs b/libs/neonart/src/tests.rs index 0b6ab685e8..2d33ee53b0 100644 --- a/libs/neonart/src/tests.rs +++ b/libs/neonart/src/tests.rs @@ -61,7 +61,7 @@ fn test_inserts + Copy>(keys: &[K]) { for (idx, k) in keys.iter().enumerate() { let r = tree_writer.start_read(); let value = r.get(&(*k).into()); - assert_eq!(value, Some(idx)); + assert_eq!(value, Some(idx).as_ref()); } eprintln!("stats: {:?}", tree_writer.start_write().get_statistics()); diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index f5b0bde27f..b34c3843e2 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -7,11 +7,13 @@ edition = "2024" crate-type = ["staticlib"] [dependencies] +axum.workspace = true bytes.workspace = true http.workspace = true libc.workspace = true nix.workspace = true atomic_enum = "0.3.0" +prometheus.workspace = true prost.workspace = true tonic = { version = "0.12.0", default-features = false, features=["codegen", "prost", "transport"] } tokio = { version = "1.43.1", features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] } @@ -22,6 +24,7 @@ tracing-subscriber.workspace = true zerocopy = "0.8.0" zerocopy-derive = "0.8.0" +metrics.workspace = true tokio-epoll-uring.workspace = true uring-common.workspace = true diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index 45cc7b02a2..2d0ec726b4 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -11,11 +11,10 @@ use std::fs::File; use std::path::Path; use std::sync::Arc; +use std::sync::Mutex; use tokio_epoll_uring; -use std::sync::Mutex; - use crate::BLCKSZ; pub type CacheBlock = u64; @@ -25,7 +24,11 @@ pub struct FileCache { file: Arc, - free_list: Mutex + free_list: Mutex, + + // metrics + max_blocks_gauge: metrics::IntGauge, + num_free_blocks_gauge: metrics::IntGauge, } // TODO: We keep track of all free blocks in this vec. That doesn't really scale. @@ -39,9 +42,14 @@ struct FreeList { impl FileCache { pub fn new( file_cache_path: &Path, - initial_size: u64, + mut initial_size: u64, uring_system: tokio_epoll_uring::SystemHandle, ) -> Result { + if initial_size < 100 { + tracing::warn!("min size for file cache is 100 blocks, {} requested", initial_size); + initial_size = 100; + } + let file = std::fs::OpenOptions::new() .read(true) .write(true) @@ -49,7 +57,16 @@ impl FileCache { .create(true) .open(file_cache_path)?; - tracing::info!("Created cache file {file_cache_path:?}"); + let max_blocks_gauge = metrics::IntGauge::new( + "file_cache_max_blocks", + "Local File Cache size in 8KiB blocks", + ).unwrap(); + let num_free_blocks_gauge = metrics::IntGauge::new( + "file_cache_num_free_blocks", + "Number of free 8KiB blocks in Local File Cache", + ).unwrap(); + + tracing::info!("initialized file cache with {} blocks", initial_size); Ok(FileCache { file: Arc::new(file), @@ -59,6 +76,8 @@ impl FileCache { max_blocks: initial_size, free_blocks: Vec::new(), }), + max_blocks_gauge, + num_free_blocks_gauge, }) } @@ -112,7 +131,7 @@ impl FileCache { } if free_list.next_free_block < free_list.max_blocks { let result = free_list.next_free_block; - free_list.next_free_block -= 1; + free_list.next_free_block += 1; return Some(result); } None @@ -132,3 +151,29 @@ fn map_io_uring_error(err: tokio_epoll_uring::Error) -> std::io: } } } + +impl metrics::core::Collector for FileCache { + fn desc(&self) -> Vec<&metrics::core::Desc> { + let mut descs = Vec::new(); + descs.append(&mut self.max_blocks_gauge.desc()); + descs.append(&mut self.num_free_blocks_gauge.desc()); + descs + } + fn collect(&self) -> Vec { + // Update the gauges with fresh values first + { + let free_list = self.free_list.lock().unwrap(); + self.max_blocks_gauge.set(free_list.max_blocks as i64); + + let total_free_blocks: i64 = + free_list.free_blocks.len() as i64 + + (free_list.max_blocks as i64 - free_list.next_free_block as i64); + self.num_free_blocks_gauge.set(total_free_blocks as i64); + } + + let mut values = Vec::new(); + values.append(&mut self.max_blocks_gauge.collect()); + values.append(&mut self.num_free_blocks_gauge.collect()); + values + } +} diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index a9ba1930e0..5b7af67722 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -58,7 +58,7 @@ pub struct IntegratedCacheWriteAccess<'t> { global_lw_lsn: AtomicU64, - file_cache: Option, + pub(crate) file_cache: Option, // Fields for eviction clock_hand: std::sync::Mutex>, @@ -223,7 +223,7 @@ impl From<(&RelTag, u32)> for TreeKey { } impl neonart::Key for TreeKey { - const KEY_LEN: usize = 4 + 4 + 4 + 1 + 32; + const KEY_LEN: usize = 4 + 4 + 4 + 1 + 4; fn as_bytes(&self) -> &[u8] { zerocopy::IntoBytes::as_bytes(self) @@ -268,6 +268,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } else { panic!("unexpected tree entry type for block key"); }; + block_entry.referenced.store(true, Ordering::Relaxed); if let Some(cache_block) = block_entry.cache_block { self.file_cache @@ -298,6 +299,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> { panic!("unexpected tree entry type for block key"); }; + // This is used for prefetch requests. Treat the probe as an 'access', to keep it + // in cache. + block_entry.referenced.store(true, Ordering::Relaxed); + if let Some(_cache_block) = block_entry.cache_block { Ok(CacheResult::Found(())) } else { @@ -373,6 +378,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } else { panic!("unexpected tree entry type for block key"); }; + block_entry.referenced.store(true, Ordering::Relaxed); block_entry.lw_lsn = lw_lsn; if block_entry.cache_block.is_none() { block_entry.cache_block = reserved_cache_block.take(); @@ -389,6 +395,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } }); + // If we didn't need to block we reserved, put it back to the free list if let Some(x) = reserved_cache_block { file_cache.dealloc_block(x); } @@ -422,12 +429,13 @@ impl<'t> IntegratedCacheWriteAccess<'t> { /// caller to use immediately. pub fn try_evict_one_cache_block(&self) -> Option { let mut clock_hand = self.clock_hand.lock().unwrap(); - for _ in 0..1000 { + for _ in 0..100 { let r = self.cache_tree.start_read(); match clock_hand.next(&r) { None => { // The cache is completely empty. Pretty unexpected that this function // was called then.. + break; }, Some((_k, TreeEntry::Rel(_))) => { // ignore rel entries for now. @@ -512,6 +520,7 @@ impl<'e> BackendCacheReadOp<'e> { } else { panic!("unexpected tree entry type for block key"); }; + block_entry.referenced.store(true, Ordering::Relaxed); block_entry.cache_block } else { diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index afb12e4e4e..336dcb3c1f 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -390,3 +390,21 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Ok(()) } } + + +impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> { + fn desc(&self) -> Vec<&metrics::core::Desc> { + let mut descs = Vec::new(); + if let Some(file_cache) = &self.cache.file_cache { + descs.append(&mut file_cache.desc()); + } + descs + } + fn collect(&self) -> Vec { + let mut values = Vec::new(); + if let Some(file_cache) = &self.cache.file_cache { + values.append(&mut file_cache.collect()); + } + values + } +} diff --git a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs new file mode 100644 index 0000000000..e9efb92a6d --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs @@ -0,0 +1,69 @@ +//! Export information about Postgres, the communicator process, file cache etc. as +//! prometheus metrics. + +use axum::Router; +use axum::extract::State; +use axum::body::Body; +use axum::response::Response; +use http::StatusCode; +use http::header::CONTENT_TYPE; + +use metrics::proto::MetricFamily; +use metrics::{Encoder, TextEncoder}; +use metrics; + +use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; + +impl<'a> CommunicatorWorkerProcessStruct<'a> { + pub(crate) async fn launch_exporter_task(&'static self) { + use axum::routing::get; + let app = Router::new() + .route("/metrics", get(get_metrics)) + .with_state(self); + + // TODO: make configurable. Or listen on unix domain socket? + let listener = tokio::net::TcpListener::bind("127.0.0.1:9090").await.unwrap(); + + tokio::spawn(async { + tracing::info!("metrics listener spawned"); + axum::serve(listener, app).await.unwrap() + }); + } +} + +/// Expose Prometheus metrics. +async fn get_metrics( + State(state): State<&CommunicatorWorkerProcessStruct<'static>> +) -> Response { + tracing::warn!("get_metrics called"); + + use metrics::core::Collector; + let metrics = state.collect(); + + // When we call TextEncoder::encode() below, it will immediately return an + // error if a metric family has no metrics, so we need to preemptively + // filter out metric families with no metrics. + let metrics = metrics + .into_iter() + .filter(|m| !m.get_metric().is_empty()) + .collect::>(); + + let encoder = TextEncoder::new(); + let mut buffer = vec![]; + + tracing::warn!("get_metrics done"); + + if let Err(e) = encoder.encode(&metrics, &mut buffer) { + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header(CONTENT_TYPE, "application/text") + .body(Body::from(e.to_string())) + .unwrap() + } else { + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() + } +} diff --git a/pgxn/neon/communicator/src/worker_process/mod.rs b/pgxn/neon/communicator/src/worker_process/mod.rs index edd35bfdcc..760d8853b0 100644 --- a/pgxn/neon/communicator/src/worker_process/mod.rs +++ b/pgxn/neon/communicator/src/worker_process/mod.rs @@ -8,4 +8,5 @@ mod callbacks; mod logging; mod main_loop; +mod metrics_exporter; mod worker_interface; diff --git a/pgxn/neon/communicator/src/worker_process/worker_interface.rs b/pgxn/neon/communicator/src/worker_process/worker_interface.rs index 053b551235..562a40fbf9 100644 --- a/pgxn/neon/communicator/src/worker_process/worker_interface.rs +++ b/pgxn/neon/communicator/src/worker_process/worker_interface.rs @@ -65,6 +65,8 @@ pub extern "C" fn communicator_worker_process_launch( error!("error: {err:?}"); }); + runtime.block_on(worker_struct.launch_exporter_task()); + // keep the runtime running after we exit this function Box::leak(Box::new(runtime)); } diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index 87b26926eb..64dfca395b 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -211,6 +211,10 @@ communicator_new_bgworker_main(Datum main_arg) struct LoggingState *logging; char errbuf[1000]; int elevel; + uint64 initial_file_cache_size; + + /* lfc_size_limit is in MBs */ + initial_file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); /* Establish signal handlers. */ pqsignal(SIGUSR1, procsignal_sigusr1_handler); @@ -231,7 +235,7 @@ communicator_new_bgworker_main(Datum main_arg) connstrs, num_shards, lfc_path, - lfc_size_limit); + initial_file_cache_size); cis = NULL; elog(LOG, "communicator threads started");