From c9260d7ce063ca6ce02b722cfa9b4bff5d005859 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 15 Jul 2025 11:05:17 +0100 Subject: [PATCH] replace prometheus with measured --- Cargo.lock | 2 +- pgxn/neon/communicator/Cargo.toml | 2 +- .../src/worker_process/lfc_metrics.rs | 146 +++++++++--------- .../src/worker_process/main_loop.rs | 20 +-- .../src/worker_process/metrics_exporter.rs | 49 ++---- 5 files changed, 92 insertions(+), 127 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53bce59652..9fc48d4216 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1293,7 +1293,7 @@ dependencies = [ "axum", "cbindgen", "http 1.1.0", - "metrics", + "measured", "tokio", "tracing", "tracing-subscriber", diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index c5b5804ebc..df5d63340b 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -19,7 +19,7 @@ tokio = { version = "1.43.1", features = ["macros", "net", "io-util", "rt", "rt- tracing.workspace = true tracing-subscriber.workspace = true -metrics.workspace = true +measured.workspace = true utils.workspace = true workspace_hack = { version = "0.1", path = "../../../workspace_hack" } diff --git a/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs b/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs index fae9cc745f..34cf8f067e 100644 --- a/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs +++ b/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs @@ -1,91 +1,89 @@ -use metrics::{IntGauge, IntGaugeVec}; +use measured::{ + FixedCardinalityLabel, Gauge, GaugeVec, LabelGroup, MetricGroup, + label::{LabelName, LabelValue, StaticLabelSet}, + metric::{MetricEncoding, gauge::GaugeState, group::Encoding}, +}; use super::callbacks::callback_get_lfc_metrics; -pub(crate) struct LfcMetricsCollector { - lfc_cache_size_limit: IntGauge, - lfc_hits: IntGauge, - lfc_misses: IntGauge, - lfc_used: IntGauge, - lfc_writes: IntGauge, - lfc_approximate_working_set_size_windows_vec: IntGaugeVec, - lfc_approximate_working_set_size_windows: [IntGauge; 60], -} +pub(crate) struct LfcMetricsCollector; impl LfcMetricsCollector { - pub fn new() -> LfcMetricsCollector { - let lfc_approximate_working_set_size_windows_vec = IntGaugeVec::new( - metrics::opts!( - "lfc_approximate_working_set_size_windows", - "Approximate working set size in pages of 8192 bytes", - ), - &["duration_seconds"], - ) - .unwrap(); - - let lfc_approximate_working_set_size_windows: [IntGauge; 60] = (1..=60) - .map(|minutes| { - lfc_approximate_working_set_size_windows_vec - .with_label_values(&[&(minutes * 60).to_string()]) - }) - .collect::>() - .try_into() - .unwrap(); - - LfcMetricsCollector { - lfc_cache_size_limit: IntGauge::new( - "lfc_cache_size_limit", - "LFC cache size limit in bytes", - ) - .unwrap(), - lfc_hits: IntGauge::new("lfc_hits", "LFC cache hits").unwrap(), - lfc_misses: IntGauge::new("lfc_misses", "LFC cache misses").unwrap(), - lfc_used: IntGauge::new("lfc_used", "LFC chunks used (chunk = 1MB)").unwrap(), - lfc_writes: IntGauge::new("lfc_writes", "LFC cache writes").unwrap(), - - lfc_approximate_working_set_size_windows_vec, - lfc_approximate_working_set_size_windows, - } + pub(crate) fn new() -> Self { + Self } } -impl metrics::core::Collector for LfcMetricsCollector { - fn desc(&self) -> Vec<&metrics::core::Desc> { - let mut descs = Vec::new(); +#[derive(MetricGroup)] +#[metric(new())] +struct LfcMetricsGroup { + /// LFC cache size limit in bytes + lfc_cache_size_limit: Gauge, + /// LFC cache hits + lfc_hits: Gauge, + /// LFC cache misses + lfc_misses: Gauge, + /// LFC chunks used (chunk = 1MB) + lfc_used: Gauge, + /// LFC cache writes + lfc_writes: Gauge, + /// Approximate working set size in pages of 8192 bytes + #[metric(init = GaugeVec::dense())] + lfc_approximate_working_set_size_windows: GaugeVec>, +} - descs.append(&mut self.lfc_cache_size_limit.desc()); - descs.append(&mut self.lfc_hits.desc()); - descs.append(&mut self.lfc_misses.desc()); - descs.append(&mut self.lfc_used.desc()); - descs.append(&mut self.lfc_writes.desc()); - descs.append(&mut self.lfc_approximate_working_set_size_windows_vec.desc()); +impl MetricGroup for LfcMetricsCollector +where + GaugeState: MetricEncoding, +{ + fn collect_group_into(&self, enc: &mut T) -> Result<(), ::Err> { + let g = LfcMetricsGroup::new(); - descs - } - - fn collect(&self) -> Vec { - let mut values = Vec::new(); - - // update the gauges let lfc_metrics = callback_get_lfc_metrics(); - self.lfc_cache_size_limit - .set(lfc_metrics.lfc_cache_size_limit); - self.lfc_hits.set(lfc_metrics.lfc_hits); - self.lfc_misses.set(lfc_metrics.lfc_misses); - self.lfc_used.set(lfc_metrics.lfc_used); - self.lfc_writes.set(lfc_metrics.lfc_writes); + + g.lfc_cache_size_limit.set(lfc_metrics.lfc_cache_size_limit); + g.lfc_hits.set(lfc_metrics.lfc_hits); + g.lfc_misses.set(lfc_metrics.lfc_misses); + g.lfc_used.set(lfc_metrics.lfc_used); + g.lfc_writes.set(lfc_metrics.lfc_writes); + for i in 0..60 { let val = lfc_metrics.lfc_approximate_working_set_size_windows[i]; - self.lfc_approximate_working_set_size_windows[i].set(val); + g.lfc_approximate_working_set_size_windows + .set(MinuteAsSeconds(i), val); } - values.append(&mut self.lfc_cache_size_limit.collect()); - values.append(&mut self.lfc_hits.collect()); - values.append(&mut self.lfc_misses.collect()); - values.append(&mut self.lfc_used.collect()); - values.append(&mut self.lfc_writes.collect()); - values.append(&mut self.lfc_approximate_working_set_size_windows_vec.collect()); - - values + g.collect_group_into(enc) + } +} + +/// This stores the values in range 0..60, +/// encodes them as seconds (0, 60, 120, 180, ..., 3540) +#[derive(Clone, Copy)] +struct MinuteAsSeconds(usize); + +impl FixedCardinalityLabel for MinuteAsSeconds { + fn cardinality() -> usize { + 60 + } + + fn encode(&self) -> usize { + self.0 + } + + fn decode(value: usize) -> Self { + Self(value) + } +} + +impl LabelValue for MinuteAsSeconds { + fn visit(&self, v: V) -> V::Output { + v.write_int(self.0 as i64 * 60) + } +} + +impl LabelGroup for MinuteAsSeconds { + fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) { + v.write_value(LabelName::from_str("duration_seconds"), self); } } diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 6247abd46e..a84e9d5277 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -2,10 +2,13 @@ use std::str::FromStr as _; use crate::worker_process::lfc_metrics::LfcMetricsCollector; +use measured::MetricGroup; use utils::id::{TenantId, TimelineId}; +#[derive(MetricGroup)] pub struct CommunicatorWorkerProcessStruct { /*** Metrics ***/ + #[metric(flatten)] pub(crate) lfc_metrics: LfcMetricsCollector, } @@ -21,20 +24,3 @@ pub(super) async fn init( lfc_metrics: LfcMetricsCollector::new(), } } - -impl metrics::core::Collector for CommunicatorWorkerProcessStruct { - fn desc(&self) -> Vec<&metrics::core::Desc> { - let mut descs = Vec::new(); - - descs.append(&mut self.lfc_metrics.desc()); - - descs - } - fn collect(&self) -> Vec { - let mut values = Vec::new(); - - values.append(&mut self.lfc_metrics.collect()); - - values - } -} diff --git a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs index 84cceb19df..4259a90979 100644 --- a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs +++ b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs @@ -13,8 +13,8 @@ use axum::response::Response; use http::StatusCode; use http::header::CONTENT_TYPE; -use metrics::proto::MetricFamily; -use metrics::{Encoder, TextEncoder}; +use measured::MetricGroup; +use measured::text::BufferedTextEncoder; use std::path::PathBuf; @@ -47,10 +47,8 @@ impl CommunicatorWorkerProcessStruct { /// Expose all Prometheus metrics. async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response { - use metrics::core::Collector; - let metrics = state.collect(); tracing::trace!("/metrics requested"); - metrics_to_response(metrics).await + metrics_to_response(&state).await } /// Expose Prometheus metrics, for use by the autoscaling agent. @@ -59,11 +57,8 @@ async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> R async fn get_autoscaling_metrics( State(state): State<&CommunicatorWorkerProcessStruct>, ) -> Response { - use metrics::core::Collector; - let metrics = state.lfc_metrics.collect(); - - tracing::trace!("/autoscaling_metrics requested"); - metrics_to_response(metrics).await + tracing::trace!("/metrics requested"); + metrics_to_response(&state.lfc_metrics).await } async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response { @@ -71,29 +66,15 @@ async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruc } /// Helper function to convert prometheus metrics to a text response -async fn metrics_to_response(metrics: Vec) -> Response { - // When we call TextEncoder::encode() below, it will immediately return an - // error if a metric family has no metrics, so we need to preemptively - // filter out metric families with no metrics. - let metrics = metrics - .into_iter() - .filter(|m| !m.get_metric().is_empty()) - .collect::>(); +async fn metrics_to_response(metrics: &(dyn MetricGroup + Sync)) -> Response { + let mut enc = BufferedTextEncoder::new(); + metrics + .collect_group_into(&mut enc) + .unwrap_or_else(|never| match never {}); - let encoder = TextEncoder::new(); - let mut buffer = vec![]; - - if let Err(e) = encoder.encode(&metrics, &mut buffer) { - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .header(CONTENT_TYPE, "application/text") - .body(Body::from(e.to_string())) - .unwrap() - } else { - Response::builder() - .status(StatusCode::OK) - .header(CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) - .unwrap() - } + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header(CONTENT_TYPE, "application/text") + .body(Body::from(enc.finish())) + .unwrap() }