replace prometheus with measured

This commit is contained in:
Conrad Ludgate
2025-07-15 11:05:17 +01:00
parent 1a75cd36d8
commit c9260d7ce0
5 changed files with 92 additions and 127 deletions

2
Cargo.lock generated
View File

@@ -1293,7 +1293,7 @@ dependencies = [
"axum",
"cbindgen",
"http 1.1.0",
"metrics",
"measured",
"tokio",
"tracing",
"tracing-subscriber",

View File

@@ -19,7 +19,7 @@ tokio = { version = "1.43.1", features = ["macros", "net", "io-util", "rt", "rt-
tracing.workspace = true
tracing-subscriber.workspace = true
metrics.workspace = true
measured.workspace = true
utils.workspace = true
workspace_hack = { version = "0.1", path = "../../../workspace_hack" }

View File

@@ -1,91 +1,89 @@
use metrics::{IntGauge, IntGaugeVec};
use measured::{
FixedCardinalityLabel, Gauge, GaugeVec, LabelGroup, MetricGroup,
label::{LabelName, LabelValue, StaticLabelSet},
metric::{MetricEncoding, gauge::GaugeState, group::Encoding},
};
use super::callbacks::callback_get_lfc_metrics;
pub(crate) struct LfcMetricsCollector {
lfc_cache_size_limit: IntGauge,
lfc_hits: IntGauge,
lfc_misses: IntGauge,
lfc_used: IntGauge,
lfc_writes: IntGauge,
lfc_approximate_working_set_size_windows_vec: IntGaugeVec,
lfc_approximate_working_set_size_windows: [IntGauge; 60],
}
pub(crate) struct LfcMetricsCollector;
impl LfcMetricsCollector {
pub fn new() -> LfcMetricsCollector {
let lfc_approximate_working_set_size_windows_vec = IntGaugeVec::new(
metrics::opts!(
"lfc_approximate_working_set_size_windows",
"Approximate working set size in pages of 8192 bytes",
),
&["duration_seconds"],
)
.unwrap();
let lfc_approximate_working_set_size_windows: [IntGauge; 60] = (1..=60)
.map(|minutes| {
lfc_approximate_working_set_size_windows_vec
.with_label_values(&[&(minutes * 60).to_string()])
})
.collect::<Vec<_>>()
.try_into()
.unwrap();
LfcMetricsCollector {
lfc_cache_size_limit: IntGauge::new(
"lfc_cache_size_limit",
"LFC cache size limit in bytes",
)
.unwrap(),
lfc_hits: IntGauge::new("lfc_hits", "LFC cache hits").unwrap(),
lfc_misses: IntGauge::new("lfc_misses", "LFC cache misses").unwrap(),
lfc_used: IntGauge::new("lfc_used", "LFC chunks used (chunk = 1MB)").unwrap(),
lfc_writes: IntGauge::new("lfc_writes", "LFC cache writes").unwrap(),
lfc_approximate_working_set_size_windows_vec,
lfc_approximate_working_set_size_windows,
}
pub(crate) fn new() -> Self {
Self
}
}
impl metrics::core::Collector for LfcMetricsCollector {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
#[derive(MetricGroup)]
#[metric(new())]
struct LfcMetricsGroup {
/// LFC cache size limit in bytes
lfc_cache_size_limit: Gauge,
/// LFC cache hits
lfc_hits: Gauge,
/// LFC cache misses
lfc_misses: Gauge,
/// LFC chunks used (chunk = 1MB)
lfc_used: Gauge,
/// LFC cache writes
lfc_writes: Gauge,
/// Approximate working set size in pages of 8192 bytes
#[metric(init = GaugeVec::dense())]
lfc_approximate_working_set_size_windows: GaugeVec<StaticLabelSet<MinuteAsSeconds>>,
}
descs.append(&mut self.lfc_cache_size_limit.desc());
descs.append(&mut self.lfc_hits.desc());
descs.append(&mut self.lfc_misses.desc());
descs.append(&mut self.lfc_used.desc());
descs.append(&mut self.lfc_writes.desc());
descs.append(&mut self.lfc_approximate_working_set_size_windows_vec.desc());
impl<T: Encoding> MetricGroup<T> for LfcMetricsCollector
where
GaugeState: MetricEncoding<T>,
{
fn collect_group_into(&self, enc: &mut T) -> Result<(), <T as Encoding>::Err> {
let g = LfcMetricsGroup::new();
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut values = Vec::new();
// update the gauges
let lfc_metrics = callback_get_lfc_metrics();
self.lfc_cache_size_limit
.set(lfc_metrics.lfc_cache_size_limit);
self.lfc_hits.set(lfc_metrics.lfc_hits);
self.lfc_misses.set(lfc_metrics.lfc_misses);
self.lfc_used.set(lfc_metrics.lfc_used);
self.lfc_writes.set(lfc_metrics.lfc_writes);
g.lfc_cache_size_limit.set(lfc_metrics.lfc_cache_size_limit);
g.lfc_hits.set(lfc_metrics.lfc_hits);
g.lfc_misses.set(lfc_metrics.lfc_misses);
g.lfc_used.set(lfc_metrics.lfc_used);
g.lfc_writes.set(lfc_metrics.lfc_writes);
for i in 0..60 {
let val = lfc_metrics.lfc_approximate_working_set_size_windows[i];
self.lfc_approximate_working_set_size_windows[i].set(val);
g.lfc_approximate_working_set_size_windows
.set(MinuteAsSeconds(i), val);
}
values.append(&mut self.lfc_cache_size_limit.collect());
values.append(&mut self.lfc_hits.collect());
values.append(&mut self.lfc_misses.collect());
values.append(&mut self.lfc_used.collect());
values.append(&mut self.lfc_writes.collect());
values.append(&mut self.lfc_approximate_working_set_size_windows_vec.collect());
values
g.collect_group_into(enc)
}
}
/// This stores the values in range 0..60,
/// encodes them as seconds (0, 60, 120, 180, ..., 3540)
#[derive(Clone, Copy)]
struct MinuteAsSeconds(usize);
impl FixedCardinalityLabel for MinuteAsSeconds {
fn cardinality() -> usize {
60
}
fn encode(&self) -> usize {
self.0
}
fn decode(value: usize) -> Self {
Self(value)
}
}
impl LabelValue for MinuteAsSeconds {
fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
v.write_int(self.0 as i64 * 60)
}
}
impl LabelGroup for MinuteAsSeconds {
fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
v.write_value(LabelName::from_str("duration_seconds"), self);
}
}

View File

@@ -2,10 +2,13 @@ use std::str::FromStr as _;
use crate::worker_process::lfc_metrics::LfcMetricsCollector;
use measured::MetricGroup;
use utils::id::{TenantId, TimelineId};
#[derive(MetricGroup)]
pub struct CommunicatorWorkerProcessStruct {
/*** Metrics ***/
#[metric(flatten)]
pub(crate) lfc_metrics: LfcMetricsCollector,
}
@@ -21,20 +24,3 @@ pub(super) async fn init(
lfc_metrics: LfcMetricsCollector::new(),
}
}
impl metrics::core::Collector for CommunicatorWorkerProcessStruct {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.lfc_metrics.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut values = Vec::new();
values.append(&mut self.lfc_metrics.collect());
values
}
}

View File

@@ -13,8 +13,8 @@ use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use metrics::proto::MetricFamily;
use metrics::{Encoder, TextEncoder};
use measured::MetricGroup;
use measured::text::BufferedTextEncoder;
use std::path::PathBuf;
@@ -47,10 +47,8 @@ impl CommunicatorWorkerProcessStruct {
/// Expose all Prometheus metrics.
async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response {
use metrics::core::Collector;
let metrics = state.collect();
tracing::trace!("/metrics requested");
metrics_to_response(metrics).await
metrics_to_response(&state).await
}
/// Expose Prometheus metrics, for use by the autoscaling agent.
@@ -59,11 +57,8 @@ async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> R
async fn get_autoscaling_metrics(
State(state): State<&CommunicatorWorkerProcessStruct>,
) -> Response {
use metrics::core::Collector;
let metrics = state.lfc_metrics.collect();
tracing::trace!("/autoscaling_metrics requested");
metrics_to_response(metrics).await
tracing::trace!("/metrics requested");
metrics_to_response(&state.lfc_metrics).await
}
async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response {
@@ -71,29 +66,15 @@ async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruc
}
/// Helper function to convert prometheus metrics to a text response
async fn metrics_to_response(metrics: Vec<MetricFamily>) -> Response {
// When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics.
let metrics = metrics
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();
async fn metrics_to_response(metrics: &(dyn MetricGroup<BufferedTextEncoder> + Sync)) -> Response {
let mut enc = BufferedTextEncoder::new();
metrics
.collect_group_into(&mut enc)
.unwrap_or_else(|never| match never {});
let encoder = TextEncoder::new();
let mut buffer = vec![];
if let Err(e) = encoder.encode(&metrics, &mut buffer) {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(CONTENT_TYPE, "application/text")
.body(Body::from(e.to_string()))
.unwrap()
} else {
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(CONTENT_TYPE, "application/text")
.body(Body::from(enc.finish()))
.unwrap()
}