From f060537a310bf2fa4a00a905de826f95c170320b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 29 May 2025 16:07:33 +0200 Subject: [PATCH] Add safekeeper reconciler metrics (#12062) Adds two metrics to the storcon that are related to the safekeeper reconciler: * `storage_controller_safkeeper_reconciles_queued` to indicate currrent queue depth * `storage_controller_safkeeper_reconciles_complete` to indicate the number of complete reconciles Both metrics operate on a per-safekeeper basis (as reconcilers run on a per-safekeeper basis too). These metrics mirror the `storage_controller_pending_reconciles` and `storage_controller_reconcile_complete` metrics, although those are not scoped on a per-pageserver basis but are global for the entire storage controller. Part of #11670 --- storage_controller/src/metrics.rs | 19 ++++++++ .../src/service/safekeeper_reconciler.rs | 45 ++++++++++++++++++- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/storage_controller/src/metrics.rs b/storage_controller/src/metrics.rs index 5ce2fb65e4..ccdbcad139 100644 --- a/storage_controller/src/metrics.rs +++ b/storage_controller/src/metrics.rs @@ -139,6 +139,14 @@ pub(crate) struct StorageControllerMetricGroup { /// HTTP request status counters for handled requests pub(crate) storage_controller_reconcile_long_running: measured::CounterVec, + + /// Indicator of safekeeper reconciler queue depth, broken down by safekeeper, excluding ongoing reconciles. + pub(crate) storage_controller_safkeeper_reconciles_queued: + measured::GaugeVec, + + /// Indicator of completed safekeeper reconciles, broken down by safekeeper. + pub(crate) storage_controller_safkeeper_reconciles_complete: + measured::CounterVec, } impl StorageControllerMetrics { @@ -257,6 +265,17 @@ pub(crate) enum Method { Other, } +#[derive(measured::LabelGroup, Clone)] +#[label(set = SafekeeperReconcilerLabelGroupSet)] +pub(crate) struct SafekeeperReconcilerLabelGroup<'a> { + #[label(dynamic_with = lasso::ThreadedRodeo, default)] + pub(crate) sk_az: &'a str, + #[label(dynamic_with = lasso::ThreadedRodeo, default)] + pub(crate) sk_node_id: &'a str, + #[label(dynamic_with = lasso::ThreadedRodeo, default)] + pub(crate) sk_hostname: &'a str, +} + impl From for Method { fn from(value: hyper::Method) -> Self { if value == hyper::Method::GET { diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index f756d98c64..fbf0b5c4e3 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -20,7 +20,9 @@ use utils::{ }; use crate::{ - persistence::SafekeeperTimelineOpKind, safekeeper::Safekeeper, + metrics::{METRICS_REGISTRY, SafekeeperReconcilerLabelGroup}, + persistence::SafekeeperTimelineOpKind, + safekeeper::Safekeeper, safekeeper_client::SafekeeperClient, }; @@ -218,7 +220,26 @@ impl ReconcilerHandle { fn schedule_reconcile(&self, req: ScheduleRequest) { let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id); let hostname = req.safekeeper.skp.host.clone(); + let sk_az = req.safekeeper.skp.availability_zone_id.clone(); + let sk_node_id = req.safekeeper.get_id().to_string(); + + // We don't have direct access to the queue depth here, so increase it blindly by 1. + // We know that putting into the queue increases the queue depth. The receiver will + // update with the correct value once it processes the next item. To avoid races where we + // reduce before we increase, leaving the gauge with a 1 value for a long time, we + // increase it before putting into the queue. + let queued_gauge = &METRICS_REGISTRY + .metrics_group + .storage_controller_safkeeper_reconciles_queued; + let label_group = SafekeeperReconcilerLabelGroup { + sk_az: &sk_az, + sk_node_id: &sk_node_id, + sk_hostname: &hostname, + }; + queued_gauge.inc(label_group.clone()); + if let Err(err) = self.tx.send((req, cancel, token_id)) { + queued_gauge.set(label_group, 0); tracing::info!("scheduling request onto {hostname} returned error: {err}"); } } @@ -283,6 +304,18 @@ impl SafekeeperReconciler { continue; } + let queued_gauge = &METRICS_REGISTRY + .metrics_group + .storage_controller_safkeeper_reconciles_queued; + queued_gauge.set( + SafekeeperReconcilerLabelGroup { + sk_az: &req.safekeeper.skp.availability_zone_id, + sk_node_id: &req.safekeeper.get_id().to_string(), + sk_hostname: &req.safekeeper.skp.host, + }, + self.rx.len() as i64, + ); + tokio::task::spawn(async move { let kind = req.kind; let tenant_id = req.tenant_id; @@ -511,6 +544,16 @@ impl SafekeeperReconcilerInner { req.generation, ) .await; + + let complete_counter = &METRICS_REGISTRY + .metrics_group + .storage_controller_safkeeper_reconciles_complete; + complete_counter.inc(SafekeeperReconcilerLabelGroup { + sk_az: &req.safekeeper.skp.availability_zone_id, + sk_node_id: &req.safekeeper.get_id().to_string(), + sk_hostname: &req.safekeeper.skp.host, + }); + if let Err(err) = res { tracing::info!( "couldn't remove reconciliation request onto {} from persistence: {err:?}",