diff --git a/Cargo.lock b/Cargo.lock index 68c9dadbe0..5e8d396519 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1323,6 +1323,7 @@ dependencies = [ "pageserver_page_api", "prometheus", "prost 0.13.5", + "strum_macros", "thiserror 1.0.69", "tokio", "tokio-pipe", diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index 0901b66428..acbc881452 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -25,6 +25,7 @@ atomic_enum = "0.3.0" measured.workspace = true prometheus.workspace = true prost.workspace = true +strum_macros.workspace = true thiserror.workspace = true tonic = { workspace = true, default-features = false, features=["codegen", "prost", "transport"] } tokio = { workspace = true, features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] } diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index d40e7484f9..78eeecb720 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -24,6 +24,8 @@ use pageserver_page_api::{self as page_api, SlruKind}; #[allow(clippy::large_enum_variant)] #[repr(C)] #[derive(Copy, Clone, Debug)] +#[derive(strum_macros::EnumDiscriminants)] +#[strum_discriminants(derive(measured::FixedCardinalityLabel))] pub enum NeonIORequest { Empty, diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 00a684e91a..0f4202b571 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -16,8 +16,6 @@ use crate::worker_process::lfc_metrics::LfcMetricsCollector; use pageserver_client_grpc::{PageserverClient, ShardSpec, ShardStripeSize}; use pageserver_page_api as page_api; -use metrics::{IntCounter, IntCounterVec}; - use tokio::io::AsyncReadExt; use tokio_pipe::PipeRead; use uring_common::buf::IoBuf; @@ -26,6 +24,7 @@ use measured::MetricGroup; use measured::metric::MetricEncoding; use measured::metric::gauge::GaugeState; use measured::metric::group::Encoding; +use measured::{Gauge, GaugeVec}; use utils::id::{TenantId, TimelineId}; use super::callbacks::{get_request_lsn, notify_proc}; @@ -62,30 +61,33 @@ pub struct CommunicatorWorkerProcessStruct<'a> { /*** Metrics ***/ pub(crate) lfc_metrics: LfcMetricsCollector, - request_counters: IntCounterVec, - request_rel_size_counter: IntCounter, - request_get_pagev_counter: IntCounter, - request_read_slru_segment_counter: IntCounter, - request_prefetchv_counter: IntCounter, - request_db_size_counter: IntCounter, - request_write_page_counter: IntCounter, - request_rel_extend_counter: IntCounter, - request_rel_zero_extend_counter: IntCounter, - request_rel_create_counter: IntCounter, - request_rel_truncate_counter: IntCounter, - request_rel_unlink_counter: IntCounter, + request_counters: GaugeVec, - getpage_cache_misses_counter: IntCounter, - getpage_cache_hits_counter: IntCounter, + getpage_cache_misses_counter: Gauge, + getpage_cache_hits_counter: Gauge, - request_nblocks_counters: IntCounterVec, - request_get_pagev_nblocks_counter: IntCounter, - request_prefetchv_nblocks_counter: IntCounter, - request_rel_zero_extend_nblocks_counter: IntCounter, + // For the requests that affect multiple blocks, have separate counters for the # of blocks affected + request_nblocks_counters: GaugeVec, + #[allow(dead_code)] allocator_metrics: MyAllocatorCollector, } +// Define a label group, consisting of 1 or more label values +#[derive(measured::LabelGroup)] +#[label(set = RequestTypeLabelGroupSet)] +struct RequestTypeLabelGroup { + request_type: crate::neon_request::NeonIORequestDiscriminants, +} + +impl RequestTypeLabelGroup { + fn from_req(req: &NeonIORequest) -> Self { + RequestTypeLabelGroup { + request_type: req.into(), + } + } +} + /// Launch the communicator process's Rust subsystems pub(super) fn init( cis: Box, @@ -150,54 +152,6 @@ pub(super) fn init( None }; - let request_counters = IntCounterVec::new( - metrics::core::Opts::new( - "backend_requests_total", - "Number of requests from backends.", - ), - &["request_kind"], - ) - .unwrap(); - let request_rel_size_counter = request_counters.with_label_values(&["rel_size"]); - let request_get_pagev_counter = request_counters.with_label_values(&["get_pagev"]); - let request_read_slru_segment_counter = - request_counters.with_label_values(&["read_slru_segment"]); - let request_prefetchv_counter = request_counters.with_label_values(&["prefetchv"]); - let request_db_size_counter = request_counters.with_label_values(&["db_size"]); - let request_write_page_counter = request_counters.with_label_values(&["write_page"]); - let request_rel_extend_counter = request_counters.with_label_values(&["rel_extend"]); - let request_rel_zero_extend_counter = request_counters.with_label_values(&["rel_zero_extend"]); - let request_rel_create_counter = request_counters.with_label_values(&["rel_create"]); - let request_rel_truncate_counter = request_counters.with_label_values(&["rel_truncate"]); - let request_rel_unlink_counter = request_counters.with_label_values(&["rel_unlink"]); - - let getpage_cache_misses_counter = IntCounter::new( - "getpage_cache_misses", - "Number of file cache misses in get_pagev requests.", - ) - .unwrap(); - let getpage_cache_hits_counter = IntCounter::new( - "getpage_cache_hits", - "Number of file cache hits in get_pagev requests.", - ) - .unwrap(); - - // For the requests that affect multiple blocks, have separate counters for the # of blocks affected - let request_nblocks_counters = IntCounterVec::new( - metrics::core::Opts::new( - "request_nblocks_total", - "Number of blocks in backend requests.", - ), - &["request_kind"], - ) - .unwrap(); - let request_get_pagev_nblocks_counter = - request_nblocks_counters.with_label_values(&["get_pagev"]); - let request_prefetchv_nblocks_counter = - request_nblocks_counters.with_label_values(&["prefetchv"]); - let request_rel_zero_extend_nblocks_counter = - request_nblocks_counters.with_label_values(&["rel_zero_extend"]); - let worker_struct = CommunicatorWorkerProcessStruct { // Note: it's important to not drop the runtime, or all the tasks are dropped // too. Including it in the returned struct is one way to keep it around. @@ -212,26 +166,12 @@ pub(super) fn init( // metrics lfc_metrics: LfcMetricsCollector, - request_counters, - request_rel_size_counter, - request_get_pagev_counter, - request_read_slru_segment_counter, - request_prefetchv_counter, - request_db_size_counter, - request_write_page_counter, - request_rel_extend_counter, - request_rel_zero_extend_counter, - request_rel_create_counter, - request_rel_truncate_counter, - request_rel_unlink_counter, + request_counters: GaugeVec::new(), - getpage_cache_misses_counter, - getpage_cache_hits_counter, + getpage_cache_misses_counter: Gauge::new(), + getpage_cache_hits_counter: Gauge::new(), - request_nblocks_counters, - request_get_pagev_nblocks_counter, - request_prefetchv_nblocks_counter, - request_rel_zero_extend_nblocks_counter, + request_nblocks_counters: GaugeVec::new(), allocator_metrics: MyAllocatorCollector::new(), }; @@ -389,18 +329,19 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } /// Handle one IO request - async fn handle_request(&'static self, req: &'_ NeonIORequest) -> NeonIOResult { + async fn handle_request(&'static self, request: &'_ NeonIORequest) -> NeonIOResult { let client = self .client .as_ref() .expect("cannot handle requests without client"); - match req { + + self.request_counters.inc(RequestTypeLabelGroup::from_req(request)); + match request { NeonIORequest::Empty => { error!("unexpected Empty IO request"); NeonIOResult::Error(0) } NeonIORequest::RelSize(req) => { - self.request_rel_size_counter.inc(); let rel = req.reltag(); let _in_progress_guard = self @@ -452,16 +393,12 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } } NeonIORequest::GetPageV(req) => { - self.request_get_pagev_counter.inc(); - self.request_get_pagev_nblocks_counter - .inc_by(req.nblocks as u64); match self.handle_get_pagev_request(req).await { Ok(()) => NeonIOResult::GetPageV, Err(errno) => NeonIOResult::Error(errno), } } NeonIORequest::ReadSlruSegment(req) => { - self.request_read_slru_segment_counter.inc(); let lsn = Lsn(req.request_lsn); let file_path = req.destination_file_path(); @@ -490,15 +427,12 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } } NeonIORequest::PrefetchV(req) => { - self.request_prefetchv_counter.inc(); - self.request_prefetchv_nblocks_counter - .inc_by(req.nblocks as u64); + self.request_nblocks_counters.inc_by(RequestTypeLabelGroup::from_req(request), req.nblocks as i64); let req = *req; tokio::spawn(async move { self.handle_prefetchv_request(&req).await }); NeonIOResult::PrefetchVLaunched } NeonIORequest::DbSize(req) => { - self.request_db_size_counter.inc(); let _in_progress_guard = self .in_progress_table .lock(RequestInProgressKey::Db(req.db_oid), req.request_id) @@ -530,8 +464,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // Write requests NeonIORequest::WritePage(req) => { - self.request_write_page_counter.inc(); - let rel = req.reltag(); let _in_progress_guard = self .in_progress_table @@ -549,8 +481,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIOResult::WriteOK } NeonIORequest::RelExtend(req) => { - self.request_rel_extend_counter.inc(); - let rel = req.reltag(); let _in_progress_guard = self .in_progress_table @@ -570,9 +500,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIOResult::WriteOK } NeonIORequest::RelZeroExtend(req) => { - self.request_rel_zero_extend_counter.inc(); - self.request_rel_zero_extend_nblocks_counter - .inc_by(req.nblocks as u64); + self.request_nblocks_counters.inc_by(RequestTypeLabelGroup::from_req(request), req.nblocks as i64); // TODO: need to grab an io-in-progress lock for this? I guess not // TODO: We could put the empty pages to the cache. Maybe have @@ -586,23 +514,17 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIOResult::WriteOK } NeonIORequest::RelCreate(req) => { - self.request_rel_create_counter.inc(); - // TODO: need to grab an io-in-progress lock for this? I guess not self.cache.remember_rel_size(&req.reltag(), 0, Lsn(req.lsn)); NeonIOResult::WriteOK } NeonIORequest::RelTruncate(req) => { - self.request_rel_truncate_counter.inc(); - // TODO: need to grab an io-in-progress lock for this? I guess not self.cache .remember_rel_size(&req.reltag(), req.nblocks, Lsn(req.lsn)); NeonIOResult::WriteOK } NeonIORequest::RelUnlink(req) => { - self.request_rel_unlink_counter.inc(); - // TODO: need to grab an io-in-progress lock for this? I guess not self.cache.forget_rel(&req.reltag(), None, Lsn(req.lsn)); NeonIOResult::WriteOK @@ -658,9 +580,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { cache_misses.push((blkno, not_modified_since, dest, in_progress_guard)); } self.getpage_cache_misses_counter - .inc_by(cache_misses.len() as u64); + .inc_by(cache_misses.len() as i64); self.getpage_cache_hits_counter - .inc_by(req.nblocks as u64 - cache_misses.len() as u64); + .inc_by(req.nblocks as i64 - cache_misses.len() as i64); if cache_misses.is_empty() { return Ok(()); @@ -824,47 +746,21 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } } -impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> { - fn desc(&self) -> Vec<&metrics::core::Desc> { - let mut descs = Vec::new(); - - descs.append(&mut self.request_counters.desc()); - descs.append(&mut self.getpage_cache_misses_counter.desc()); - descs.append(&mut self.getpage_cache_hits_counter.desc()); - descs.append(&mut self.request_nblocks_counters.desc()); - - if let Some(file_cache) = &self.cache.file_cache { - descs.append(&mut file_cache.desc()); - } - descs.append(&mut self.cache.desc()); - descs.append(&mut self.allocator_metrics.desc()); - - descs - } - fn collect(&self) -> Vec { - let mut values = Vec::new(); - - values.append(&mut self.request_counters.collect()); - values.append(&mut self.getpage_cache_misses_counter.collect()); - values.append(&mut self.getpage_cache_hits_counter.collect()); - values.append(&mut self.request_nblocks_counters.collect()); - - if let Some(file_cache) = &self.cache.file_cache { - values.append(&mut file_cache.collect()); - } - values.append(&mut self.cache.collect()); - values.append(&mut self.allocator_metrics.collect()); - - values - } -} - impl MetricGroup for CommunicatorWorkerProcessStruct<'_> where T: Encoding, GaugeState: MetricEncoding, { fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> { - self.lfc_metrics.collect_group_into(enc) + use measured::metric::MetricFamilyEncoding; + use measured::metric::name::MetricName; + + self.lfc_metrics.collect_group_into(enc)?; + self.request_counters.collect_family_into(MetricName::from_str("request_counters"), enc)?; + self.request_nblocks_counters.collect_family_into(MetricName::from_str("request_nblocks_counters"), enc)?; + + // FIXME: allocator metrics + + Ok(()) } }