From 8a4f16a471ec2575c3fdbd968e77215f8122046f Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 31 Jul 2025 14:09:39 +0300 Subject: [PATCH] More work on metrics Switch to the 'measured' crate everywhere in the communicator. Connect the allocator metrics to the metrics endpoint. --- Cargo.lock | 1 - pgxn/neon/communicator/Cargo.toml | 1 - pgxn/neon/communicator/src/file_cache.rs | 56 ++++---- .../neon/communicator/src/global_allocator.rs | 81 +++++------ .../neon/communicator/src/integrated_cache.rs | 134 +++++++----------- .../src/worker_process/main_loop.rs | 11 +- 6 files changed, 116 insertions(+), 168 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43077f1b05..7404ff0587 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1315,7 +1315,6 @@ dependencies = [ "http 1.3.1", "libc", "measured", - "metrics", "neon-shmem", "nix 0.30.1", "pageserver_api", diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index 822d4f749e..7ff0e0b29a 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -34,7 +34,6 @@ tokio-pipe = { version = "0.2.12" } tracing.workspace = true tracing-subscriber.workspace = true -metrics.workspace = true uring-common = { workspace = true, features = ["bytes"] } pageserver_client_grpc.workspace = true diff --git a/pgxn/neon/communicator/src/file_cache.rs b/pgxn/neon/communicator/src/file_cache.rs index f153174c6b..4470bdfb37 100644 --- a/pgxn/neon/communicator/src/file_cache.rs +++ b/pgxn/neon/communicator/src/file_cache.rs @@ -14,6 +14,11 @@ use std::path::Path; use std::sync::Arc; use std::sync::Mutex; +use measured::{Gauge, MetricGroup}; +use measured::metric::gauge::GaugeState; +use measured::metric::MetricEncoding; +use measured::metric; + use crate::BLCKSZ; use tokio::task::spawn_blocking; @@ -22,15 +27,22 @@ pub type CacheBlock = u64; pub const INVALID_CACHE_BLOCK: CacheBlock = u64::MAX; -#[derive(Debug)] pub struct FileCache { file: Arc, free_list: Mutex, - // metrics - max_blocks_gauge: metrics::IntGauge, - num_free_blocks_gauge: metrics::IntGauge, + metrics: FileCacheMetricGroup, +} + +#[derive(MetricGroup)] +#[metric(new())] +struct FileCacheMetricGroup { + /// Local File Cache size in 8KiB blocks + max_blocks: Gauge, + + /// Number of free 8KiB blocks in Local File Cache + num_free_blocks: Gauge, } // TODO: We keep track of all free blocks in this vec. That doesn't really scale. @@ -61,17 +73,6 @@ impl FileCache { .create(true) .open(file_cache_path)?; - let max_blocks_gauge = metrics::IntGauge::new( - "file_cache_max_blocks", - "Local File Cache size in 8KiB blocks", - ) - .unwrap(); - let num_free_blocks_gauge = metrics::IntGauge::new( - "file_cache_num_free_blocks", - "Number of free 8KiB blocks in Local File Cache", - ) - .unwrap(); - tracing::info!("initialized file cache with {} blocks", initial_size); Ok(FileCache { @@ -81,8 +82,7 @@ impl FileCache { max_blocks: initial_size, free_blocks: Vec::new(), }), - max_blocks_gauge, - num_free_blocks_gauge, + metrics: FileCacheMetricGroup::new(), }) } @@ -136,27 +136,21 @@ impl FileCache { } } -impl metrics::core::Collector for FileCache { - fn desc(&self) -> Vec<&metrics::core::Desc> { - let mut descs = Vec::new(); - descs.append(&mut self.max_blocks_gauge.desc()); - descs.append(&mut self.num_free_blocks_gauge.desc()); - descs - } - fn collect(&self) -> Vec { +impl MetricGroup for FileCache +where + GaugeState: MetricEncoding, +{ + fn collect_group_into(&self, enc: &mut T) -> Result<(), ::Err> { // Update the gauges with fresh values first { let free_list = self.free_list.lock().unwrap(); - self.max_blocks_gauge.set(free_list.max_blocks as i64); + self.metrics.max_blocks.set(free_list.max_blocks as i64); let total_free_blocks: i64 = free_list.free_blocks.len() as i64 + (free_list.max_blocks as i64 - free_list.next_free_block as i64); - self.num_free_blocks_gauge.set(total_free_blocks); + self.metrics.num_free_blocks.set(total_free_blocks); } - let mut values = Vec::new(); - values.append(&mut self.max_blocks_gauge.collect()); - values.append(&mut self.num_free_blocks_gauge.collect()); - values + self.metrics.collect_group_into(enc) } } diff --git a/pgxn/neon/communicator/src/global_allocator.rs b/pgxn/neon/communicator/src/global_allocator.rs index 0c8e88071f..e7a08d23d2 100644 --- a/pgxn/neon/communicator/src/global_allocator.rs +++ b/pgxn/neon/communicator/src/global_allocator.rs @@ -18,9 +18,12 @@ use std::alloc::{GlobalAlloc, Layout, System}; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -use metrics::IntGauge; +use measured::{Gauge, MetricGroup}; +use measured::metric::gauge::GaugeState; +use measured::metric::MetricEncoding; +use measured::metric; -struct MyAllocator { +pub(crate) struct MyAllocator { allocations: AtomicU64, deallocations: AtomicU64, @@ -28,6 +31,22 @@ struct MyAllocator { high: AtomicUsize, } +#[derive(MetricGroup)] +#[metric(new())] +struct MyAllocatorMetricGroup { + /// Number of allocations in Rust code + communicator_mem_allocations: Gauge, + + /// Number of deallocations in Rust code + communicator_mem_deallocations: Gauge, + + /// Bytes currently allocated + communicator_mem_allocated: Gauge, + + /// High watermark of allocated bytes + communicator_mem_high: Gauge, +} + unsafe impl GlobalAlloc for MyAllocator { unsafe fn alloc(&self, layout: Layout) -> *mut u8 { self.allocations.fetch_add(1, Ordering::Relaxed); @@ -52,58 +71,32 @@ static GLOBAL: MyAllocator = MyAllocator { high: AtomicUsize::new(0), }; -pub struct MyAllocatorCollector { - allocations: IntGauge, - deallocations: IntGauge, - allocated: IntGauge, - high: IntGauge, +pub(crate) struct MyAllocatorCollector { + metrics: MyAllocatorMetricGroup, } impl MyAllocatorCollector { - pub fn new() -> MyAllocatorCollector { - MyAllocatorCollector { - allocations: IntGauge::new("allocations_total", "Number of allocations in Rust code") - .unwrap(), - deallocations: IntGauge::new( - "deallocations_total", - "Number of deallocations in Rust code", - ) - .unwrap(), - allocated: IntGauge::new("allocated_total", "Bytes currently allocated").unwrap(), - high: IntGauge::new("allocated_high", "High watermark of allocated bytes").unwrap(), + pub(crate) fn new() -> Self { + Self { + metrics: MyAllocatorMetricGroup::new(), } } } -impl metrics::core::Collector for MyAllocatorCollector { - fn desc(&self) -> Vec<&metrics::core::Desc> { - let mut descs = Vec::new(); - - descs.append(&mut self.allocations.desc()); - descs.append(&mut self.deallocations.desc()); - descs.append(&mut self.allocated.desc()); - descs.append(&mut self.high.desc()); - - descs - } - - fn collect(&self) -> Vec { - let mut values = Vec::new(); - - // update the gauges - self.allocations +impl MetricGroup for MyAllocatorCollector +where + GaugeState: MetricEncoding, +{ + fn collect_group_into(&self, enc: &mut T) -> Result<(), ::Err> { + // Update the gauges with fresh values first + self.metrics.communicator_mem_allocations .set(GLOBAL.allocations.load(Ordering::Relaxed) as i64); - self.deallocations + self.metrics.communicator_mem_deallocations .set(GLOBAL.allocations.load(Ordering::Relaxed) as i64); - self.allocated + self.metrics.communicator_mem_allocated .set(GLOBAL.allocated.load(Ordering::Relaxed) as i64); - self.high.set(GLOBAL.high.load(Ordering::Relaxed) as i64); + self.metrics.communicator_mem_high.set(GLOBAL.high.load(Ordering::Relaxed) as i64); - values.append(&mut self.allocations.collect()); - values.append(&mut self.deallocations.collect()); - values.append(&mut self.allocated.collect()); - values.append(&mut self.high.collect()); - - values + self.metrics.collect_group_into(enc) } } diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 0e32f45d32..b4bb994980 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -32,7 +32,11 @@ use crate::file_cache::{CacheBlock, FileCache}; use crate::init::alloc_from_slice; use pageserver_page_api::RelTag; -use metrics::{IntCounter, IntGauge}; +use measured::{Counter, Gauge, MetricGroup}; +use measured::metric::counter::CounterState; +use measured::metric::gauge::GaugeState; +use measured::metric::MetricEncoding; +use measured::metric; use neon_shmem::hash::{HashMapInit, entry::Entry}; use neon_shmem::shmem::ShmemHandle; @@ -55,7 +59,6 @@ pub struct IntegratedCacheShared { } /// Represents write-access to the integrated cache. This is used by the communicator process. -#[derive(Debug)] pub struct IntegratedCacheWriteAccess<'t> { shared: &'t IntegratedCacheShared, relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>, @@ -66,17 +69,34 @@ pub struct IntegratedCacheWriteAccess<'t> { // Fields for eviction clock_hand: AtomicUsize, - // Metrics - cache_page_evictions_counter: IntCounter, - block_entry_evictions_counter: IntCounter, - clock_iterations_counter: IntCounter, + metrics: IntegratedCacheMetricGroup, +} + +#[derive(MetricGroup)] +#[metric(new())] +struct IntegratedCacheMetricGroup { + /// Page evictions from the Local File Cache + cache_page_evictions_counter: Counter, + + /// Block entry evictions from the integrated cache + block_entry_evictions_counter: Counter, + + /// Number of times the clock hand has moved + clock_iterations_counter: Counter, // metrics from the hash map - block_map_num_buckets: IntGauge, - block_map_num_buckets_in_use: IntGauge, - relsize_cache_num_buckets: IntGauge, - relsize_cache_num_buckets_in_use: IntGauge, + /// Allocated size of the block cache hash map + block_map_num_buckets: Gauge, + + /// Number of buckets in use in the block cache hash map + block_map_num_buckets_in_use: Gauge, + + /// Allocated size of the relsize cache hash map + relsize_cache_num_buckets: Gauge, + + /// Number of buckets in use in the relsize cache hash map + relsize_cache_num_buckets_in_use: Gauge, } /// Represents read-only access to the integrated cache. Backend processes have this. @@ -151,46 +171,7 @@ impl<'t> IntegratedCacheInitStruct<'t> { block_map: block_map_handle.attach_writer(), file_cache, clock_hand: AtomicUsize::new(0), - - cache_page_evictions_counter: metrics::IntCounter::new( - "integrated_cache_page_evictions", - "Page evictions from the Local File Cache", - ) - .unwrap(), - - block_entry_evictions_counter: metrics::IntCounter::new( - "integrated_cache_block_entry_evictions", - "Block entry evictions from the integrated cache", - ) - .unwrap(), - - clock_iterations_counter: metrics::IntCounter::new( - "clock_iterations", - "Number of times the clock hand has moved", - ) - .unwrap(), - - block_map_num_buckets: metrics::IntGauge::new( - "block_map_num_buckets", - "Allocated size of the block cache hash map", - ) - .unwrap(), - block_map_num_buckets_in_use: metrics::IntGauge::new( - "block_map_num_buckets_in_use", - "Number of buckets in use in the block cache hash map", - ) - .unwrap(), - - relsize_cache_num_buckets: metrics::IntGauge::new( - "relsize_cache_num_buckets", - "Allocated size of the relsize cache hash map", - ) - .unwrap(), - relsize_cache_num_buckets_in_use: metrics::IntGauge::new( - "relsize_cache_num_buckets_in_use", - "Number of buckets in use in the relsize cache hash map", - ) - .unwrap(), + metrics: IntegratedCacheMetricGroup::new(), } } @@ -642,7 +623,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { fn try_evict_block_entry(&self) { let num_buckets = self.block_map.get_num_buckets(); loop { - self.clock_iterations_counter.inc(); + self.metrics.clock_iterations_counter.inc(); let victim_bucket = self.clock_hand.fetch_add(1, Ordering::Relaxed) % num_buckets; let evict_this = match self.block_map.get_at_bucket(victim_bucket).as_deref() { @@ -693,7 +674,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let num_buckets = self.block_map.get_num_buckets(); let mut iterations = 0; while iterations < 100 { - self.clock_iterations_counter.inc(); + self.metrics.clock_iterations_counter.inc(); let victim_bucket = self.clock_hand.fetch_add(1, Ordering::Relaxed) % num_buckets; let evict_this = match self.block_map.get_at_bucket(victim_bucket).as_deref() { @@ -759,9 +740,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> { n => Some(n), }; if evicted_cache_block.is_some() { - self.cache_page_evictions_counter.inc(); + self.metrics.cache_page_evictions_counter.inc(); } - self.block_entry_evictions_counter.inc(); + self.metrics.block_entry_evictions_counter.inc(); EvictResult::Evicted(evicted_cache_block) } else { EvictResult::Pinned @@ -794,44 +775,27 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } } -impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { - fn desc(&self) -> Vec<&metrics::core::Desc> { - let mut descs = Vec::new(); - descs.append(&mut self.cache_page_evictions_counter.desc()); - descs.append(&mut self.block_entry_evictions_counter.desc()); - descs.append(&mut self.clock_iterations_counter.desc()); - - descs.append(&mut self.block_map_num_buckets.desc()); - descs.append(&mut self.block_map_num_buckets_in_use.desc()); - - descs.append(&mut self.relsize_cache_num_buckets.desc()); - descs.append(&mut self.relsize_cache_num_buckets_in_use.desc()); - - descs - } - fn collect(&self) -> Vec { +impl MetricGroup for IntegratedCacheWriteAccess<'_> +where + CounterState: MetricEncoding, + GaugeState: MetricEncoding, +{ + fn collect_group_into(&self, enc: &mut T) -> Result<(), ::Err> { // Update gauges - self.block_map_num_buckets + self.metrics.block_map_num_buckets .set(self.block_map.get_num_buckets() as i64); - self.block_map_num_buckets_in_use + self.metrics.block_map_num_buckets_in_use .set(self.block_map.get_num_buckets_in_use() as i64); - self.relsize_cache_num_buckets + self.metrics.relsize_cache_num_buckets .set(self.relsize_cache.get_num_buckets() as i64); - self.relsize_cache_num_buckets_in_use + self.metrics.relsize_cache_num_buckets_in_use .set(self.relsize_cache.get_num_buckets_in_use() as i64); - let mut values = Vec::new(); - values.append(&mut self.cache_page_evictions_counter.collect()); - values.append(&mut self.block_entry_evictions_counter.collect()); - values.append(&mut self.clock_iterations_counter.collect()); + if let Some(file_cache) = &self.file_cache { + file_cache.collect_group_into(enc)?; + } - values.append(&mut self.block_map_num_buckets.collect()); - values.append(&mut self.block_map_num_buckets_in_use.collect()); - - values.append(&mut self.relsize_cache_num_buckets.collect()); - values.append(&mut self.relsize_cache_num_buckets_in_use.collect()); - - values + self.metrics.collect_group_into(enc) } } diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 10a5fd81e5..2eabed2219 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -23,6 +23,7 @@ use uring_common::buf::IoBuf; use measured::MetricGroup; use measured::metric::MetricEncoding; +use measured::metric::counter::CounterState; use measured::metric::gauge::GaugeState; use measured::metric::group::Encoding; use measured::{Gauge, GaugeVec}; @@ -30,7 +31,7 @@ use utils::id::{TenantId, TimelineId}; use super::callbacks::{get_request_lsn, notify_proc}; -use tracing::{debug, error, info, info_span, trace}; +use tracing::{error, info, info_span, trace}; use utils::lsn::Lsn; @@ -65,7 +66,6 @@ pub struct CommunicatorWorkerProcessStruct<'a> { // For the requests that affect multiple blocks, have separate counters for the # of blocks affected request_nblocks_counters: GaugeVec, - #[allow(dead_code)] allocator_metrics: MyAllocatorCollector, } @@ -145,8 +145,6 @@ pub(super) fn init( .integrated_cache_init_struct .worker_process_init(last_lsn, file_cache); - debug!("Initialised integrated cache: {cache:?}"); - let client = { let _guard = runtime.enter(); PageserverClient::new( @@ -818,6 +816,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { impl MetricGroup for CommunicatorWorkerProcessStruct<'_> where T: Encoding, + CounterState: MetricEncoding, GaugeState: MetricEncoding, { fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> { @@ -825,12 +824,12 @@ where use measured::metric::name::MetricName; self.lfc_metrics.collect_group_into(enc)?; + self.cache.collect_group_into(enc)?; self.request_counters .collect_family_into(MetricName::from_str("request_counters"), enc)?; self.request_nblocks_counters .collect_family_into(MetricName::from_str("request_nblocks_counters"), enc)?; - - // FIXME: allocator metrics + self.allocator_metrics.collect_group_into(enc)?; Ok(()) }