From ffeede085e3008616872372ac98edcef573c8677 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 10 Jul 2025 12:58:22 +0100 Subject: [PATCH] libs: move metric collection for pageserver and safekeeper in a background task (#12525) ## Problem Safekeeper and pageserver metrics collection might time out. We've seen this in both hadron and neon. ## Summary of changes This PR moves metrics collection in PS/SK to the background so that we will always get some metrics, despite there may be some delays. Will leave it to the future work to reduce metrics collection time. --------- Co-authored-by: Chen Luo --- libs/http-utils/src/endpoint.rs | 37 ++++++++- libs/pageserver_api/src/config.rs | 2 + libs/utils/src/lib.rs | 2 + libs/utils/src/metrics_collector.rs | 75 +++++++++++++++++++ pageserver/src/bin/pageserver.rs | 41 +++++++++- pageserver/src/config.rs | 6 ++ pageserver/src/http/routes.rs | 7 +- pageserver/src/lib.rs | 12 +++ safekeeper/src/bin/safekeeper.rs | 27 +++++++ safekeeper/src/http/routes.rs | 9 ++- safekeeper/src/lib.rs | 2 + safekeeper/src/wal_backup.rs | 2 +- .../tests/walproposer_sim/safekeeper.rs | 1 + test_runner/fixtures/pageserver/http.py | 2 +- test_runner/fixtures/safekeeper/http.py | 2 +- 15 files changed, 217 insertions(+), 10 deletions(-) create mode 100644 libs/utils/src/metrics_collector.rs diff --git a/libs/http-utils/src/endpoint.rs b/libs/http-utils/src/endpoint.rs index f32ced1180..a61bf8e08a 100644 --- a/libs/http-utils/src/endpoint.rs +++ b/libs/http-utils/src/endpoint.rs @@ -20,6 +20,7 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::io::ReaderStream; use tracing::{Instrument, debug, info, info_span, warn}; use utils::auth::{AuthError, Claims, SwappableJwtAuth}; +use utils::metrics_collector::{METRICS_COLLECTOR, METRICS_STALE_MILLIS}; use crate::error::{ApiError, api_error_handler, route_error_handler}; use crate::request::{get_query_param, parse_query_param}; @@ -250,9 +251,28 @@ impl std::io::Write for ChannelWriter { } } -pub async fn prometheus_metrics_handler(_req: Request) -> Result, ApiError> { +pub async fn prometheus_metrics_handler( + req: Request, + force_metric_collection_on_scrape: bool, +) -> Result, ApiError> { SERVE_METRICS_COUNT.inc(); + // HADRON + let requested_use_latest = parse_query_param(&req, "use_latest")?; + + let use_latest = match requested_use_latest { + None => force_metric_collection_on_scrape, + Some(true) => true, + Some(false) => { + if force_metric_collection_on_scrape { + // We don't cache in this case + true + } else { + false + } + } + }; + let started_at = std::time::Instant::now(); let (tx, rx) = mpsc::channel(1); @@ -277,12 +297,18 @@ pub async fn prometheus_metrics_handler(_req: Request) -> Result) -> Result { tracing::info!( @@ -303,6 +333,7 @@ pub async fn prometheus_metrics_handler(_req: Request) -> Result, #[serde(skip_serializing_if = "Option::is_none")] pub image_layer_generation_large_timeline_threshold: Option, + pub force_metric_collection_on_scrape: bool, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -831,6 +832,7 @@ impl Default for ConfigToml { basebackup_cache_config: None, posthog_config: None, image_layer_generation_large_timeline_threshold: Some(2 * 1024 * 1024 * 1024), + force_metric_collection_on_scrape: true, } } } diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 11f787562c..2b81da017d 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -99,6 +99,8 @@ pub mod elapsed_accum; #[cfg(target_os = "linux")] pub mod linux_socket_ioctl; +pub mod metrics_collector; + // Re-export used in macro. Avoids adding git-version as dep in target crates. #[doc(hidden)] pub use git_version; diff --git a/libs/utils/src/metrics_collector.rs b/libs/utils/src/metrics_collector.rs new file mode 100644 index 0000000000..9e57fcd643 --- /dev/null +++ b/libs/utils/src/metrics_collector.rs @@ -0,0 +1,75 @@ +use std::{ + sync::{Arc, RwLock}, + time::{Duration, Instant}, +}; + +use metrics::{IntGauge, proto::MetricFamily, register_int_gauge}; +use once_cell::sync::Lazy; + +pub static METRICS_STALE_MILLIS: Lazy = Lazy::new(|| { + register_int_gauge!( + "metrics_metrics_stale_milliseconds", + "The current metrics stale time in milliseconds" + ) + .expect("failed to define a metric") +}); + +#[derive(Debug)] +pub struct CollectedMetrics { + pub metrics: Vec, + pub collected_at: Instant, +} + +impl CollectedMetrics { + fn new(metrics: Vec) -> Self { + Self { + metrics, + collected_at: Instant::now(), + } + } +} + +#[derive(Debug)] +pub struct MetricsCollector { + last_collected: RwLock>, +} + +impl MetricsCollector { + pub fn new() -> Self { + Self { + last_collected: RwLock::new(Arc::new(CollectedMetrics::new(vec![]))), + } + } + + #[tracing::instrument(name = "metrics_collector", skip_all)] + pub fn run_once(&self, cache_metrics: bool) -> Arc { + let started = Instant::now(); + let metrics = metrics::gather(); + let collected = Arc::new(CollectedMetrics::new(metrics)); + if cache_metrics { + let mut guard = self.last_collected.write().unwrap(); + *guard = collected.clone(); + } + tracing::info!( + "Collected {} metric families in {} ms", + collected.metrics.len(), + started.elapsed().as_millis() + ); + collected + } + + pub fn last_collected(&self) -> Arc { + self.last_collected.read().unwrap().clone() + } +} + +impl Default for MetricsCollector { + fn default() -> Self { + Self::new() + } +} + +// Interval for metrics collection. Currently hard-coded to be the same as the metrics scape interval from the obs agent +pub static METRICS_COLLECTION_INTERVAL: Duration = Duration::from_secs(30); + +pub static METRICS_COLLECTOR: Lazy = Lazy::new(MetricsCollector::default); diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 78aba25d2e..299fe7e159 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -29,8 +29,8 @@ use pageserver::task_mgr::{ }; use pageserver::tenant::{TenantSharedResources, mgr, secondary}; use pageserver::{ - CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, http, - page_cache, page_service, task_mgr, virtual_file, + CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, + MetricsCollectionTask, http, page_cache, page_service, task_mgr, virtual_file, }; use postgres_backend::AuthType; use remote_storage::GenericRemoteStorage; @@ -41,6 +41,7 @@ use tracing_utils::OtelGuard; use utils::auth::{JwtAuth, SwappableJwtAuth}; use utils::crashsafe::syncfs; use utils::logging::TracingErrorLayerEnablement; +use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR}; use utils::sentry_init::init_sentry; use utils::{failpoint_support, logging, project_build_tag, project_git_version, tcp_listener}; @@ -763,6 +764,41 @@ fn start_pageserver( (http_task, https_task) }; + /* BEGIN_HADRON */ + let metrics_collection_task = { + let cancel = shutdown_pageserver.child_token(); + let task = crate::BACKGROUND_RUNTIME.spawn({ + let cancel = cancel.clone(); + let background_jobs_barrier = background_jobs_barrier.clone(); + async move { + if conf.force_metric_collection_on_scrape { + return; + } + + // first wait until background jobs are cleared to launch. + tokio::select! { + _ = cancel.cancelled() => { return; }, + _ = background_jobs_barrier.wait() => {} + }; + let mut interval = tokio::time::interval(METRICS_COLLECTION_INTERVAL); + loop { + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("cancelled metrics collection task, exiting..."); + break; + }, + _ = interval.tick() => {} + } + tokio::task::spawn_blocking(|| { + METRICS_COLLECTOR.run_once(true); + }); + } + } + }); + MetricsCollectionTask(CancellableTask { task, cancel }) + }; + /* END_HADRON */ + let consumption_metrics_tasks = { let cancel = shutdown_pageserver.child_token(); let task = crate::BACKGROUND_RUNTIME.spawn({ @@ -844,6 +880,7 @@ fn start_pageserver( https_endpoint_listener, page_service, page_service_grpc, + metrics_collection_task, consumption_metrics_tasks, disk_usage_eviction_task, &tenant_manager, diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f64c5838ff..bb73ae1dd5 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -256,6 +256,10 @@ pub struct PageServerConf { /// Defines what is a big tenant for the purpose of image layer generation. /// See Timeline::should_check_if_image_layers_required pub image_layer_generation_large_timeline_threshold: Option, + + /// Controls whether to collect all metrics on each scrape or to return potentially stale + /// results. + pub force_metric_collection_on_scrape: bool, } /// Token for authentication to safekeepers @@ -437,6 +441,7 @@ impl PageServerConf { timeline_import_config, basebackup_cache_config, image_layer_generation_large_timeline_threshold, + force_metric_collection_on_scrape, } = config_toml; let mut conf = PageServerConf { @@ -496,6 +501,7 @@ impl PageServerConf { timeline_import_config, basebackup_cache_config, image_layer_generation_large_timeline_threshold, + force_metric_collection_on_scrape, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 767bba49e2..ed0a5440cb 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3938,9 +3938,14 @@ pub fn make_router( .expect("construct launch timestamp header middleware"), ); + let force_metric_collection_on_scrape = state.conf.force_metric_collection_on_scrape; + + let prometheus_metrics_handler_wrapper = + move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape); + Ok(router .data(state) - .get("/metrics", |r| request_span(r, prometheus_metrics_handler)) + .get("/metrics", move |r| request_span(r, prometheus_metrics_handler_wrapper)) .get("/profile/cpu", |r| request_span(r, profile_cpu_handler)) .get("/profile/heap", |r| request_span(r, profile_heap_handler)) .get("/v1/status", |r| api_handler(r, status_handler)) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 0dd3c465e0..0864026f6b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -73,6 +73,9 @@ pub struct HttpEndpointListener(pub CancellableTask); pub struct HttpsEndpointListener(pub CancellableTask); pub struct ConsumptionMetricsTasks(pub CancellableTask); pub struct DiskUsageEvictionTask(pub CancellableTask); +// HADRON +pub struct MetricsCollectionTask(pub CancellableTask); + impl CancellableTask { pub async fn shutdown(self) { self.cancel.cancel(); @@ -87,6 +90,7 @@ pub async fn shutdown_pageserver( https_listener: Option, page_service: page_service::Listener, grpc_task: Option, + metrics_collection_task: MetricsCollectionTask, consumption_metrics_worker: ConsumptionMetricsTasks, disk_usage_eviction_task: Option, tenant_manager: &TenantManager, @@ -211,6 +215,14 @@ pub async fn shutdown_pageserver( // Best effort to persist any outstanding deletions, to avoid leaking objects deletion_queue.shutdown(Duration::from_secs(5)).await; + // HADRON + timed( + metrics_collection_task.0.shutdown(), + "shutdown metrics collections metrics", + Duration::from_secs(1), + ) + .await; + timed( consumption_metrics_worker.0.shutdown(), "shutdown consumption metrics", diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 8fda625817..b2d5976ef4 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -37,6 +37,7 @@ use tracing::*; use utils::auth::{JwtAuth, Scope, SwappableJwtAuth}; use utils::id::NodeId; use utils::logging::{self, LogFormat, SecretString}; +use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR}; use utils::sentry_init::init_sentry; use utils::{pid_file, project_build_tag, project_git_version, tcp_listener}; @@ -243,6 +244,11 @@ struct Args { #[arg(long)] enable_tls_wal_service_api: bool, + /// Controls whether to collect all metrics on each scrape or to return potentially stale + /// results. + #[arg(long, default_value_t = true)] + force_metric_collection_on_scrape: bool, + /// Run in development mode (disables security checks) #[arg(long, help = "Run in development mode (disables security checks)")] dev: bool, @@ -428,6 +434,7 @@ async fn main() -> anyhow::Result<()> { ssl_ca_certs, use_https_safekeeper_api: args.use_https_safekeeper_api, enable_tls_wal_service_api: args.enable_tls_wal_service_api, + force_metric_collection_on_scrape: args.force_metric_collection_on_scrape, }); // initialize sentry if SENTRY_DSN is provided @@ -640,6 +647,26 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { .map(|res| ("broker main".to_owned(), res)); tasks_handles.push(Box::pin(broker_task_handle)); + /* BEGIN_HADRON */ + if conf.force_metric_collection_on_scrape { + let metrics_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| BACKGROUND_RUNTIME.handle()) + .spawn(async move { + let mut interval: tokio::time::Interval = + tokio::time::interval(METRICS_COLLECTION_INTERVAL); + loop { + interval.tick().await; + tokio::task::spawn_blocking(|| { + METRICS_COLLECTOR.run_once(true); + }); + } + }) + .map(|res| ("broker main".to_owned(), res)); + tasks_handles.push(Box::pin(metrics_handle)); + } + /* END_HADRON */ + set_build_info_metric(GIT_VERSION, BUILD_TAG); // TODO: update tokio-stream, convert to real async Stream with diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 384c582678..4b061c65d9 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -699,6 +699,11 @@ pub fn make_router( })) } + let force_metric_collection_on_scrape = conf.force_metric_collection_on_scrape; + + let prometheus_metrics_handler_wrapper = + move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape); + // NB: on any changes do not forget to update the OpenAPI spec // located nearby (/safekeeper/src/http/openapi_spec.yaml). let auth = conf.http_auth.clone(); @@ -706,7 +711,9 @@ pub fn make_router( .data(conf) .data(global_timelines) .data(auth) - .get("/metrics", |r| request_span(r, prometheus_metrics_handler)) + .get("/metrics", move |r| { + request_span(r, prometheus_metrics_handler_wrapper) + }) .get("/profile/cpu", |r| request_span(r, profile_cpu_handler)) .get("/profile/heap", |r| request_span(r, profile_heap_handler)) .get("/v1/status", |r| request_span(r, status_handler)) diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index c461c071da..c0b5403ebf 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -134,6 +134,7 @@ pub struct SafeKeeperConf { pub ssl_ca_certs: Vec, pub use_https_safekeeper_api: bool, pub enable_tls_wal_service_api: bool, + pub force_metric_collection_on_scrape: bool, } impl SafeKeeperConf { @@ -183,6 +184,7 @@ impl SafeKeeperConf { ssl_ca_certs: Vec::new(), use_https_safekeeper_api: false, enable_tls_wal_service_api: false, + force_metric_collection_on_scrape: true, } } } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 7e10847a1b..0e8dfd64c3 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -166,7 +166,7 @@ fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option) -> Result<()> { ssl_ca_certs: Vec::new(), use_https_safekeeper_api: false, enable_tls_wal_service_api: false, + force_metric_collection_on_scrape: true, }; let mut global = GlobalMap::new(disk, conf.clone())?; diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 79cfba8da6..8e7d957b22 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -1002,7 +1002,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): def get_metrics_str(self) -> str: """You probably want to use get_metrics() instead.""" - res = self.get(f"http://localhost:{self.port}/metrics") + res = self.get(f"http://localhost:{self.port}/metrics?use_latest=true") self.verbose_error(res) return res.text diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index 942b620be6..ceb00c0f90 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -143,7 +143,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): def get_metrics_str(self) -> str: """You probably want to use get_metrics() instead.""" - request_result = self.get(f"http://localhost:{self.port}/metrics") + request_result = self.get(f"http://localhost:{self.port}/metrics?use_latest=true") request_result.raise_for_status() return request_result.text