refactor metrics to use 'measured' crate

This commit is contained in:
Heikki Linnakangas
2025-07-23 00:56:21 +03:00
parent 48535798ba
commit c18f4a52f8
4 changed files with 48 additions and 148 deletions

1
Cargo.lock generated
View File

@@ -1323,6 +1323,7 @@ dependencies = [
"pageserver_page_api",
"prometheus",
"prost 0.13.5",
"strum_macros",
"thiserror 1.0.69",
"tokio",
"tokio-pipe",

View File

@@ -25,6 +25,7 @@ atomic_enum = "0.3.0"
measured.workspace = true
prometheus.workspace = true
prost.workspace = true
strum_macros.workspace = true
thiserror.workspace = true
tonic = { workspace = true, default-features = false, features=["codegen", "prost", "transport"] }
tokio = { workspace = true, features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] }

View File

@@ -24,6 +24,8 @@ use pageserver_page_api::{self as page_api, SlruKind};
#[allow(clippy::large_enum_variant)]
#[repr(C)]
#[derive(Copy, Clone, Debug)]
#[derive(strum_macros::EnumDiscriminants)]
#[strum_discriminants(derive(measured::FixedCardinalityLabel))]
pub enum NeonIORequest {
Empty,

View File

@@ -16,8 +16,6 @@ use crate::worker_process::lfc_metrics::LfcMetricsCollector;
use pageserver_client_grpc::{PageserverClient, ShardSpec, ShardStripeSize};
use pageserver_page_api as page_api;
use metrics::{IntCounter, IntCounterVec};
use tokio::io::AsyncReadExt;
use tokio_pipe::PipeRead;
use uring_common::buf::IoBuf;
@@ -26,6 +24,7 @@ use measured::MetricGroup;
use measured::metric::MetricEncoding;
use measured::metric::gauge::GaugeState;
use measured::metric::group::Encoding;
use measured::{Gauge, GaugeVec};
use utils::id::{TenantId, TimelineId};
use super::callbacks::{get_request_lsn, notify_proc};
@@ -62,30 +61,33 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
/*** Metrics ***/
pub(crate) lfc_metrics: LfcMetricsCollector,
request_counters: IntCounterVec,
request_rel_size_counter: IntCounter,
request_get_pagev_counter: IntCounter,
request_read_slru_segment_counter: IntCounter,
request_prefetchv_counter: IntCounter,
request_db_size_counter: IntCounter,
request_write_page_counter: IntCounter,
request_rel_extend_counter: IntCounter,
request_rel_zero_extend_counter: IntCounter,
request_rel_create_counter: IntCounter,
request_rel_truncate_counter: IntCounter,
request_rel_unlink_counter: IntCounter,
request_counters: GaugeVec<RequestTypeLabelGroupSet>,
getpage_cache_misses_counter: IntCounter,
getpage_cache_hits_counter: IntCounter,
getpage_cache_misses_counter: Gauge,
getpage_cache_hits_counter: Gauge,
request_nblocks_counters: IntCounterVec,
request_get_pagev_nblocks_counter: IntCounter,
request_prefetchv_nblocks_counter: IntCounter,
request_rel_zero_extend_nblocks_counter: IntCounter,
// For the requests that affect multiple blocks, have separate counters for the # of blocks affected
request_nblocks_counters: GaugeVec<RequestTypeLabelGroupSet>,
#[allow(dead_code)]
allocator_metrics: MyAllocatorCollector,
}
// Define a label group, consisting of 1 or more label values
#[derive(measured::LabelGroup)]
#[label(set = RequestTypeLabelGroupSet)]
struct RequestTypeLabelGroup {
request_type: crate::neon_request::NeonIORequestDiscriminants,
}
impl RequestTypeLabelGroup {
fn from_req(req: &NeonIORequest) -> Self {
RequestTypeLabelGroup {
request_type: req.into(),
}
}
}
/// Launch the communicator process's Rust subsystems
pub(super) fn init(
cis: Box<CommunicatorInitStruct>,
@@ -150,54 +152,6 @@ pub(super) fn init(
None
};
let request_counters = IntCounterVec::new(
metrics::core::Opts::new(
"backend_requests_total",
"Number of requests from backends.",
),
&["request_kind"],
)
.unwrap();
let request_rel_size_counter = request_counters.with_label_values(&["rel_size"]);
let request_get_pagev_counter = request_counters.with_label_values(&["get_pagev"]);
let request_read_slru_segment_counter =
request_counters.with_label_values(&["read_slru_segment"]);
let request_prefetchv_counter = request_counters.with_label_values(&["prefetchv"]);
let request_db_size_counter = request_counters.with_label_values(&["db_size"]);
let request_write_page_counter = request_counters.with_label_values(&["write_page"]);
let request_rel_extend_counter = request_counters.with_label_values(&["rel_extend"]);
let request_rel_zero_extend_counter = request_counters.with_label_values(&["rel_zero_extend"]);
let request_rel_create_counter = request_counters.with_label_values(&["rel_create"]);
let request_rel_truncate_counter = request_counters.with_label_values(&["rel_truncate"]);
let request_rel_unlink_counter = request_counters.with_label_values(&["rel_unlink"]);
let getpage_cache_misses_counter = IntCounter::new(
"getpage_cache_misses",
"Number of file cache misses in get_pagev requests.",
)
.unwrap();
let getpage_cache_hits_counter = IntCounter::new(
"getpage_cache_hits",
"Number of file cache hits in get_pagev requests.",
)
.unwrap();
// For the requests that affect multiple blocks, have separate counters for the # of blocks affected
let request_nblocks_counters = IntCounterVec::new(
metrics::core::Opts::new(
"request_nblocks_total",
"Number of blocks in backend requests.",
),
&["request_kind"],
)
.unwrap();
let request_get_pagev_nblocks_counter =
request_nblocks_counters.with_label_values(&["get_pagev"]);
let request_prefetchv_nblocks_counter =
request_nblocks_counters.with_label_values(&["prefetchv"]);
let request_rel_zero_extend_nblocks_counter =
request_nblocks_counters.with_label_values(&["rel_zero_extend"]);
let worker_struct = CommunicatorWorkerProcessStruct {
// Note: it's important to not drop the runtime, or all the tasks are dropped
// too. Including it in the returned struct is one way to keep it around.
@@ -212,26 +166,12 @@ pub(super) fn init(
// metrics
lfc_metrics: LfcMetricsCollector,
request_counters,
request_rel_size_counter,
request_get_pagev_counter,
request_read_slru_segment_counter,
request_prefetchv_counter,
request_db_size_counter,
request_write_page_counter,
request_rel_extend_counter,
request_rel_zero_extend_counter,
request_rel_create_counter,
request_rel_truncate_counter,
request_rel_unlink_counter,
request_counters: GaugeVec::new(),
getpage_cache_misses_counter,
getpage_cache_hits_counter,
getpage_cache_misses_counter: Gauge::new(),
getpage_cache_hits_counter: Gauge::new(),
request_nblocks_counters,
request_get_pagev_nblocks_counter,
request_prefetchv_nblocks_counter,
request_rel_zero_extend_nblocks_counter,
request_nblocks_counters: GaugeVec::new(),
allocator_metrics: MyAllocatorCollector::new(),
};
@@ -389,18 +329,19 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
/// Handle one IO request
async fn handle_request(&'static self, req: &'_ NeonIORequest) -> NeonIOResult {
async fn handle_request(&'static self, request: &'_ NeonIORequest) -> NeonIOResult {
let client = self
.client
.as_ref()
.expect("cannot handle requests without client");
match req {
self.request_counters.inc(RequestTypeLabelGroup::from_req(request));
match request {
NeonIORequest::Empty => {
error!("unexpected Empty IO request");
NeonIOResult::Error(0)
}
NeonIORequest::RelSize(req) => {
self.request_rel_size_counter.inc();
let rel = req.reltag();
let _in_progress_guard = self
@@ -452,16 +393,12 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
}
NeonIORequest::GetPageV(req) => {
self.request_get_pagev_counter.inc();
self.request_get_pagev_nblocks_counter
.inc_by(req.nblocks as u64);
match self.handle_get_pagev_request(req).await {
Ok(()) => NeonIOResult::GetPageV,
Err(errno) => NeonIOResult::Error(errno),
}
}
NeonIORequest::ReadSlruSegment(req) => {
self.request_read_slru_segment_counter.inc();
let lsn = Lsn(req.request_lsn);
let file_path = req.destination_file_path();
@@ -490,15 +427,12 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
}
NeonIORequest::PrefetchV(req) => {
self.request_prefetchv_counter.inc();
self.request_prefetchv_nblocks_counter
.inc_by(req.nblocks as u64);
self.request_nblocks_counters.inc_by(RequestTypeLabelGroup::from_req(request), req.nblocks as i64);
let req = *req;
tokio::spawn(async move { self.handle_prefetchv_request(&req).await });
NeonIOResult::PrefetchVLaunched
}
NeonIORequest::DbSize(req) => {
self.request_db_size_counter.inc();
let _in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Db(req.db_oid), req.request_id)
@@ -530,8 +464,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// Write requests
NeonIORequest::WritePage(req) => {
self.request_write_page_counter.inc();
let rel = req.reltag();
let _in_progress_guard = self
.in_progress_table
@@ -549,8 +481,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIOResult::WriteOK
}
NeonIORequest::RelExtend(req) => {
self.request_rel_extend_counter.inc();
let rel = req.reltag();
let _in_progress_guard = self
.in_progress_table
@@ -570,9 +500,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIOResult::WriteOK
}
NeonIORequest::RelZeroExtend(req) => {
self.request_rel_zero_extend_counter.inc();
self.request_rel_zero_extend_nblocks_counter
.inc_by(req.nblocks as u64);
self.request_nblocks_counters.inc_by(RequestTypeLabelGroup::from_req(request), req.nblocks as i64);
// TODO: need to grab an io-in-progress lock for this? I guess not
// TODO: We could put the empty pages to the cache. Maybe have
@@ -586,23 +514,17 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIOResult::WriteOK
}
NeonIORequest::RelCreate(req) => {
self.request_rel_create_counter.inc();
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache.remember_rel_size(&req.reltag(), 0, Lsn(req.lsn));
NeonIOResult::WriteOK
}
NeonIORequest::RelTruncate(req) => {
self.request_rel_truncate_counter.inc();
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache
.remember_rel_size(&req.reltag(), req.nblocks, Lsn(req.lsn));
NeonIOResult::WriteOK
}
NeonIORequest::RelUnlink(req) => {
self.request_rel_unlink_counter.inc();
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache.forget_rel(&req.reltag(), None, Lsn(req.lsn));
NeonIOResult::WriteOK
@@ -658,9 +580,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
cache_misses.push((blkno, not_modified_since, dest, in_progress_guard));
}
self.getpage_cache_misses_counter
.inc_by(cache_misses.len() as u64);
.inc_by(cache_misses.len() as i64);
self.getpage_cache_hits_counter
.inc_by(req.nblocks as u64 - cache_misses.len() as u64);
.inc_by(req.nblocks as i64 - cache_misses.len() as i64);
if cache_misses.is_empty() {
return Ok(());
@@ -824,47 +746,21 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
}
impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.request_counters.desc());
descs.append(&mut self.getpage_cache_misses_counter.desc());
descs.append(&mut self.getpage_cache_hits_counter.desc());
descs.append(&mut self.request_nblocks_counters.desc());
if let Some(file_cache) = &self.cache.file_cache {
descs.append(&mut file_cache.desc());
}
descs.append(&mut self.cache.desc());
descs.append(&mut self.allocator_metrics.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut values = Vec::new();
values.append(&mut self.request_counters.collect());
values.append(&mut self.getpage_cache_misses_counter.collect());
values.append(&mut self.getpage_cache_hits_counter.collect());
values.append(&mut self.request_nblocks_counters.collect());
if let Some(file_cache) = &self.cache.file_cache {
values.append(&mut file_cache.collect());
}
values.append(&mut self.cache.collect());
values.append(&mut self.allocator_metrics.collect());
values
}
}
impl<T> MetricGroup<T> for CommunicatorWorkerProcessStruct<'_>
where
T: Encoding,
GaugeState: MetricEncoding<T>,
{
fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> {
self.lfc_metrics.collect_group_into(enc)
use measured::metric::MetricFamilyEncoding;
use measured::metric::name::MetricName;
self.lfc_metrics.collect_group_into(enc)?;
self.request_counters.collect_family_into(MetricName::from_str("request_counters"), enc)?;
self.request_nblocks_counters.collect_family_into(MetricName::from_str("request_nblocks_counters"), enc)?;
// FIXME: allocator metrics
Ok(())
}
}