From 25718e324a49c00c4bb11e4cc298d7b3e2665107 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Fri, 25 Jul 2025 20:27:21 +0200 Subject: [PATCH] proxy: Define service_info metric showing the run state (#12749) ## Problem Monitoring dashboards show aggregates of all proxy instances, including terminating ones. This can skew the results or make graphs less readable. Also, alerts must be tuned to ignore certain signals from terminating proxies. ## Summary of changes Add a `service_info` metric currently with one label, `state`, showing if an instance is in state `init`, `running`, or `terminating`. The metric can be joined with other metrics to filter the presented time series. --- libs/metrics/src/lib.rs | 6 ++++ proxy/src/binary/local_proxy.rs | 7 ++++- proxy/src/binary/pg_sni_router.rs | 8 +++++- proxy/src/binary/proxy.rs | 7 ++++- proxy/src/metrics.rs | 48 +++++++++++++++++++++++++++++-- proxy/src/signals.rs | 4 +++ 6 files changed, 75 insertions(+), 5 deletions(-) diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index 41873cdcd6..6cf27abcaf 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -129,6 +129,12 @@ impl InfoMetric { } } +impl Default for InfoMetric { + fn default() -> Self { + InfoMetric::new(L::default()) + } +} + impl> InfoMetric { pub fn with_metric(label: L, metric: M) -> Self { Self { diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs index 7b9012dc69..3a17e6da83 100644 --- a/proxy/src/binary/local_proxy.rs +++ b/proxy/src/binary/local_proxy.rs @@ -29,7 +29,7 @@ use crate::config::{ }; use crate::control_plane::locks::ApiLocks; use crate::http::health_server::AppMetrics; -use crate::metrics::{Metrics, ThreadPoolMetrics}; +use crate::metrics::{Metrics, ServiceInfo, ThreadPoolMetrics}; use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo}; use crate::scram::threadpool::ThreadPool; use crate::serverless::cancel_set::CancelSet; @@ -207,6 +207,11 @@ pub async fn run() -> anyhow::Result<()> { endpoint_rate_limiter, ); + Metrics::get() + .service + .info + .set_label(ServiceInfo::running()); + match futures::future::select(pin!(maintenance_tasks.join_next()), pin!(task)).await { // exit immediately on maintenance task completion Either::Left((Some(res), _)) => match crate::error::flatten_err(res)? {}, diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index f3782312dc..b22b413b74 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -26,7 +26,7 @@ use utils::project_git_version; use utils::sentry_init::init_sentry; use crate::context::RequestContext; -use crate::metrics::{Metrics, ThreadPoolMetrics}; +use crate::metrics::{Metrics, ServiceInfo, ThreadPoolMetrics}; use crate::pglb::TlsRequired; use crate::pqproto::FeStartupPacket; use crate::protocol2::ConnectionInfo; @@ -135,6 +135,12 @@ pub async fn run() -> anyhow::Result<()> { cancellation_token.clone(), )) .map(crate::error::flatten_err); + + Metrics::get() + .service + .info + .set_label(ServiceInfo::running()); + let signals_task = tokio::spawn(crate::signals::handle(cancellation_token, || {})); // the signal task cant ever succeed. diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index aa0e1b8f98..e912bebd67 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -40,7 +40,7 @@ use crate::config::{ }; use crate::context::parquet::ParquetUploadArgs; use crate::http::health_server::AppMetrics; -use crate::metrics::Metrics; +use crate::metrics::{Metrics, ServiceInfo}; use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo, WakeComputeRateLimiter}; use crate::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider; use crate::redis::kv_ops::RedisKVClient; @@ -590,6 +590,11 @@ pub async fn run() -> anyhow::Result<()> { } } + Metrics::get() + .service + .info + .set_label(ServiceInfo::running()); + let maintenance = loop { // get one complete task match futures::future::select( diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 7524133093..869bce32f2 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -2,7 +2,8 @@ use std::sync::{Arc, OnceLock}; use lasso::ThreadedRodeo; use measured::label::{ - FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet, + FixedCardinalitySet, LabelGroupSet, LabelGroupVisitor, LabelName, LabelSet, LabelValue, + StaticLabelSet, }; use measured::metric::histogram::Thresholds; use measured::metric::name::MetricName; @@ -10,7 +11,7 @@ use measured::{ Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup, MetricGroup, }; -use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec}; +use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric}; use tokio::time::{self, Instant}; use crate::control_plane::messages::ColdStartInfo; @@ -25,6 +26,9 @@ pub struct Metrics { #[metric(namespace = "wake_compute_lock")] pub wake_compute_lock: ApiLockMetrics, + + #[metric(namespace = "service")] + pub service: ServiceMetrics, } static SELF: OnceLock = OnceLock::new(); @@ -660,3 +664,43 @@ pub struct ThreadPoolMetrics { #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))] pub worker_task_skips_total: CounterVec, } + +#[derive(MetricGroup, Default)] +pub struct ServiceMetrics { + pub info: InfoMetric, +} + +#[derive(Default)] +pub struct ServiceInfo { + pub state: ServiceState, +} + +impl ServiceInfo { + pub const fn running() -> Self { + ServiceInfo { + state: ServiceState::Running, + } + } + + pub const fn terminating() -> Self { + ServiceInfo { + state: ServiceState::Terminating, + } + } +} + +impl LabelGroup for ServiceInfo { + fn visit_values(&self, v: &mut impl LabelGroupVisitor) { + const STATE: &LabelName = LabelName::from_str("state"); + v.write_value(STATE, &self.state); + } +} + +#[derive(FixedCardinalityLabel, Clone, Copy, Debug, Default)] +#[label(singleton = "state")] +pub enum ServiceState { + #[default] + Init, + Running, + Terminating, +} diff --git a/proxy/src/signals.rs b/proxy/src/signals.rs index 32b2344a1c..63ef7d1061 100644 --- a/proxy/src/signals.rs +++ b/proxy/src/signals.rs @@ -4,6 +4,8 @@ use anyhow::bail; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; +use crate::metrics::{Metrics, ServiceInfo}; + /// Handle unix signals appropriately. pub async fn handle( token: CancellationToken, @@ -28,10 +30,12 @@ where // Shut down the whole application. _ = interrupt.recv() => { warn!("received SIGINT, exiting immediately"); + Metrics::get().service.info.set_label(ServiceInfo::terminating()); bail!("interrupted"); } _ = terminate.recv() => { warn!("received SIGTERM, shutting down once all existing connections have closed"); + Metrics::get().service.info.set_label(ServiceInfo::terminating()); token.cancel(); } }