proxy: Define service_info metric showing the run state (#12749)

## Problem

Monitoring dashboards show aggregates of all proxy instances, including
terminating ones. This can skew the results or make graphs less
readable. Also, alerts must be tuned to ignore certain signals from
terminating proxies.

## Summary of changes

Add a `service_info` metric currently with one label, `state`, showing
if an instance is in state `init`, `running`, or `terminating`. The
metric can be joined with other metrics to filter the presented time
series.
This commit is contained in:
Folke Behrens
2025-07-25 20:27:21 +02:00
committed by GitHub
parent ac8f44c70e
commit 25718e324a
6 changed files with 75 additions and 5 deletions

View File

@@ -129,6 +129,12 @@ impl<L: LabelGroup> InfoMetric<L> {
} }
} }
impl<L: LabelGroup + Default> Default for InfoMetric<L, GaugeState> {
fn default() -> Self {
InfoMetric::new(L::default())
}
}
impl<L: LabelGroup, M: MetricType<Metadata = ()>> InfoMetric<L, M> { impl<L: LabelGroup, M: MetricType<Metadata = ()>> InfoMetric<L, M> {
pub fn with_metric(label: L, metric: M) -> Self { pub fn with_metric(label: L, metric: M) -> Self {
Self { Self {

View File

@@ -29,7 +29,7 @@ use crate::config::{
}; };
use crate::control_plane::locks::ApiLocks; use crate::control_plane::locks::ApiLocks;
use crate::http::health_server::AppMetrics; use crate::http::health_server::AppMetrics;
use crate::metrics::{Metrics, ThreadPoolMetrics}; use crate::metrics::{Metrics, ServiceInfo, ThreadPoolMetrics};
use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo}; use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo};
use crate::scram::threadpool::ThreadPool; use crate::scram::threadpool::ThreadPool;
use crate::serverless::cancel_set::CancelSet; use crate::serverless::cancel_set::CancelSet;
@@ -207,6 +207,11 @@ pub async fn run() -> anyhow::Result<()> {
endpoint_rate_limiter, endpoint_rate_limiter,
); );
Metrics::get()
.service
.info
.set_label(ServiceInfo::running());
match futures::future::select(pin!(maintenance_tasks.join_next()), pin!(task)).await { match futures::future::select(pin!(maintenance_tasks.join_next()), pin!(task)).await {
// exit immediately on maintenance task completion // exit immediately on maintenance task completion
Either::Left((Some(res), _)) => match crate::error::flatten_err(res)? {}, Either::Left((Some(res), _)) => match crate::error::flatten_err(res)? {},

View File

@@ -26,7 +26,7 @@ use utils::project_git_version;
use utils::sentry_init::init_sentry; use utils::sentry_init::init_sentry;
use crate::context::RequestContext; use crate::context::RequestContext;
use crate::metrics::{Metrics, ThreadPoolMetrics}; use crate::metrics::{Metrics, ServiceInfo, ThreadPoolMetrics};
use crate::pglb::TlsRequired; use crate::pglb::TlsRequired;
use crate::pqproto::FeStartupPacket; use crate::pqproto::FeStartupPacket;
use crate::protocol2::ConnectionInfo; use crate::protocol2::ConnectionInfo;
@@ -135,6 +135,12 @@ pub async fn run() -> anyhow::Result<()> {
cancellation_token.clone(), cancellation_token.clone(),
)) ))
.map(crate::error::flatten_err); .map(crate::error::flatten_err);
Metrics::get()
.service
.info
.set_label(ServiceInfo::running());
let signals_task = tokio::spawn(crate::signals::handle(cancellation_token, || {})); let signals_task = tokio::spawn(crate::signals::handle(cancellation_token, || {}));
// the signal task cant ever succeed. // the signal task cant ever succeed.

View File

@@ -40,7 +40,7 @@ use crate::config::{
}; };
use crate::context::parquet::ParquetUploadArgs; use crate::context::parquet::ParquetUploadArgs;
use crate::http::health_server::AppMetrics; use crate::http::health_server::AppMetrics;
use crate::metrics::Metrics; use crate::metrics::{Metrics, ServiceInfo};
use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo, WakeComputeRateLimiter}; use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo, WakeComputeRateLimiter};
use crate::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider; use crate::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::redis::kv_ops::RedisKVClient; use crate::redis::kv_ops::RedisKVClient;
@@ -590,6 +590,11 @@ pub async fn run() -> anyhow::Result<()> {
} }
} }
Metrics::get()
.service
.info
.set_label(ServiceInfo::running());
let maintenance = loop { let maintenance = loop {
// get one complete task // get one complete task
match futures::future::select( match futures::future::select(

View File

@@ -2,7 +2,8 @@ use std::sync::{Arc, OnceLock};
use lasso::ThreadedRodeo; use lasso::ThreadedRodeo;
use measured::label::{ use measured::label::{
FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet, FixedCardinalitySet, LabelGroupSet, LabelGroupVisitor, LabelName, LabelSet, LabelValue,
StaticLabelSet,
}; };
use measured::metric::histogram::Thresholds; use measured::metric::histogram::Thresholds;
use measured::metric::name::MetricName; use measured::metric::name::MetricName;
@@ -10,7 +11,7 @@ use measured::{
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup, Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
MetricGroup, MetricGroup,
}; };
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec}; use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
use tokio::time::{self, Instant}; use tokio::time::{self, Instant};
use crate::control_plane::messages::ColdStartInfo; use crate::control_plane::messages::ColdStartInfo;
@@ -25,6 +26,9 @@ pub struct Metrics {
#[metric(namespace = "wake_compute_lock")] #[metric(namespace = "wake_compute_lock")]
pub wake_compute_lock: ApiLockMetrics, pub wake_compute_lock: ApiLockMetrics,
#[metric(namespace = "service")]
pub service: ServiceMetrics,
} }
static SELF: OnceLock<Metrics> = OnceLock::new(); static SELF: OnceLock<Metrics> = OnceLock::new();
@@ -660,3 +664,43 @@ pub struct ThreadPoolMetrics {
#[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))] #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>, pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
} }
#[derive(MetricGroup, Default)]
pub struct ServiceMetrics {
pub info: InfoMetric<ServiceInfo>,
}
#[derive(Default)]
pub struct ServiceInfo {
pub state: ServiceState,
}
impl ServiceInfo {
pub const fn running() -> Self {
ServiceInfo {
state: ServiceState::Running,
}
}
pub const fn terminating() -> Self {
ServiceInfo {
state: ServiceState::Terminating,
}
}
}
impl LabelGroup for ServiceInfo {
fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
const STATE: &LabelName = LabelName::from_str("state");
v.write_value(STATE, &self.state);
}
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug, Default)]
#[label(singleton = "state")]
pub enum ServiceState {
#[default]
Init,
Running,
Terminating,
}

View File

@@ -4,6 +4,8 @@ use anyhow::bail;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{info, warn}; use tracing::{info, warn};
use crate::metrics::{Metrics, ServiceInfo};
/// Handle unix signals appropriately. /// Handle unix signals appropriately.
pub async fn handle<F>( pub async fn handle<F>(
token: CancellationToken, token: CancellationToken,
@@ -28,10 +30,12 @@ where
// Shut down the whole application. // Shut down the whole application.
_ = interrupt.recv() => { _ = interrupt.recv() => {
warn!("received SIGINT, exiting immediately"); warn!("received SIGINT, exiting immediately");
Metrics::get().service.info.set_label(ServiceInfo::terminating());
bail!("interrupted"); bail!("interrupted");
} }
_ = terminate.recv() => { _ = terminate.recv() => {
warn!("received SIGTERM, shutting down once all existing connections have closed"); warn!("received SIGTERM, shutting down once all existing connections have closed");
Metrics::get().service.info.set_label(ServiceInfo::terminating());
token.cancel(); token.cancel();
} }
} }