diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index 41873cdcd6..6cf27abcaf 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -129,6 +129,12 @@ impl InfoMetric { } } +impl Default for InfoMetric { + fn default() -> Self { + InfoMetric::new(L::default()) + } +} + impl> InfoMetric { pub fn with_metric(label: L, metric: M) -> Self { Self { diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs index 7b9012dc69..3a17e6da83 100644 --- a/proxy/src/binary/local_proxy.rs +++ b/proxy/src/binary/local_proxy.rs @@ -29,7 +29,7 @@ use crate::config::{ }; use crate::control_plane::locks::ApiLocks; use crate::http::health_server::AppMetrics; -use crate::metrics::{Metrics, ThreadPoolMetrics}; +use crate::metrics::{Metrics, ServiceInfo, ThreadPoolMetrics}; use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo}; use crate::scram::threadpool::ThreadPool; use crate::serverless::cancel_set::CancelSet; @@ -207,6 +207,11 @@ pub async fn run() -> anyhow::Result<()> { endpoint_rate_limiter, ); + Metrics::get() + .service + .info + .set_label(ServiceInfo::running()); + match futures::future::select(pin!(maintenance_tasks.join_next()), pin!(task)).await { // exit immediately on maintenance task completion Either::Left((Some(res), _)) => match crate::error::flatten_err(res)? {}, diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index f3782312dc..b22b413b74 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -26,7 +26,7 @@ use utils::project_git_version; use utils::sentry_init::init_sentry; use crate::context::RequestContext; -use crate::metrics::{Metrics, ThreadPoolMetrics}; +use crate::metrics::{Metrics, ServiceInfo, ThreadPoolMetrics}; use crate::pglb::TlsRequired; use crate::pqproto::FeStartupPacket; use crate::protocol2::ConnectionInfo; @@ -135,6 +135,12 @@ pub async fn run() -> anyhow::Result<()> { cancellation_token.clone(), )) .map(crate::error::flatten_err); + + Metrics::get() + .service + .info + .set_label(ServiceInfo::running()); + let signals_task = tokio::spawn(crate::signals::handle(cancellation_token, || {})); // the signal task cant ever succeed. diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index aa0e1b8f98..e912bebd67 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -40,7 +40,7 @@ use crate::config::{ }; use crate::context::parquet::ParquetUploadArgs; use crate::http::health_server::AppMetrics; -use crate::metrics::Metrics; +use crate::metrics::{Metrics, ServiceInfo}; use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo, WakeComputeRateLimiter}; use crate::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider; use crate::redis::kv_ops::RedisKVClient; @@ -590,6 +590,11 @@ pub async fn run() -> anyhow::Result<()> { } } + Metrics::get() + .service + .info + .set_label(ServiceInfo::running()); + let maintenance = loop { // get one complete task match futures::future::select( diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 7524133093..869bce32f2 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -2,7 +2,8 @@ use std::sync::{Arc, OnceLock}; use lasso::ThreadedRodeo; use measured::label::{ - FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet, + FixedCardinalitySet, LabelGroupSet, LabelGroupVisitor, LabelName, LabelSet, LabelValue, + StaticLabelSet, }; use measured::metric::histogram::Thresholds; use measured::metric::name::MetricName; @@ -10,7 +11,7 @@ use measured::{ Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup, MetricGroup, }; -use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec}; +use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric}; use tokio::time::{self, Instant}; use crate::control_plane::messages::ColdStartInfo; @@ -25,6 +26,9 @@ pub struct Metrics { #[metric(namespace = "wake_compute_lock")] pub wake_compute_lock: ApiLockMetrics, + + #[metric(namespace = "service")] + pub service: ServiceMetrics, } static SELF: OnceLock = OnceLock::new(); @@ -660,3 +664,43 @@ pub struct ThreadPoolMetrics { #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))] pub worker_task_skips_total: CounterVec, } + +#[derive(MetricGroup, Default)] +pub struct ServiceMetrics { + pub info: InfoMetric, +} + +#[derive(Default)] +pub struct ServiceInfo { + pub state: ServiceState, +} + +impl ServiceInfo { + pub const fn running() -> Self { + ServiceInfo { + state: ServiceState::Running, + } + } + + pub const fn terminating() -> Self { + ServiceInfo { + state: ServiceState::Terminating, + } + } +} + +impl LabelGroup for ServiceInfo { + fn visit_values(&self, v: &mut impl LabelGroupVisitor) { + const STATE: &LabelName = LabelName::from_str("state"); + v.write_value(STATE, &self.state); + } +} + +#[derive(FixedCardinalityLabel, Clone, Copy, Debug, Default)] +#[label(singleton = "state")] +pub enum ServiceState { + #[default] + Init, + Running, + Terminating, +} diff --git a/proxy/src/signals.rs b/proxy/src/signals.rs index 32b2344a1c..63ef7d1061 100644 --- a/proxy/src/signals.rs +++ b/proxy/src/signals.rs @@ -4,6 +4,8 @@ use anyhow::bail; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; +use crate::metrics::{Metrics, ServiceInfo}; + /// Handle unix signals appropriately. pub async fn handle( token: CancellationToken, @@ -28,10 +30,12 @@ where // Shut down the whole application. _ = interrupt.recv() => { warn!("received SIGINT, exiting immediately"); + Metrics::get().service.info.set_label(ServiceInfo::terminating()); bail!("interrupted"); } _ = terminate.recv() => { warn!("received SIGTERM, shutting down once all existing connections have closed"); + Metrics::get().service.info.set_label(ServiceInfo::terminating()); token.cancel(); } }