diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 13f7977946..061045eb76 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -11,6 +11,7 @@ use reqwest::Url; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime}; +use tokio::time::Instant; use tracing::*; use utils::id::NodeId; @@ -88,22 +89,12 @@ pub async fn collect_metrics( let node_id = node_id.to_string(); - // reminder: ticker is ready immediatedly - let mut ticker = tokio::time::interval(metric_collection_interval); - loop { - let tick_at = tokio::select! { - _ = cancel.cancelled() => return Ok(()), - tick_at = ticker.tick() => tick_at, - }; + let started_at = Instant::now(); // these are point in time, with variable "now" let metrics = metrics::collect_all_metrics(&cached_metrics, &ctx).await; - if metrics.is_empty() { - continue; - } - let metrics = Arc::new(metrics); // why not race cancellation here? because we are one of the last tasks, and if we are @@ -142,10 +133,19 @@ pub async fn collect_metrics( let (_, _) = tokio::join!(flush, upload); crate::tenant::tasks::warn_when_period_overrun( - tick_at.elapsed(), + started_at.elapsed(), metric_collection_interval, BackgroundLoopKind::ConsumptionMetricsCollectMetrics, ); + + let res = tokio::time::timeout_at( + started_at + metric_collection_interval, + task_mgr::shutdown_token().cancelled(), + ) + .await; + if res.is_ok() { + return Ok(()); + } } } @@ -244,16 +244,14 @@ async fn calculate_synthetic_size_worker( ctx: &RequestContext, ) -> anyhow::Result<()> { info!("starting calculate_synthetic_size_worker"); + scopeguard::defer! { + info!("calculate_synthetic_size_worker stopped"); + }; - // reminder: ticker is ready immediatedly - let mut ticker = tokio::time::interval(synthetic_size_calculation_interval); let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize; loop { - let tick_at = tokio::select! { - _ = task_mgr::shutdown_watcher() => return Ok(()), - tick_at = ticker.tick() => tick_at, - }; + let started_at = Instant::now(); let tenants = match mgr::list_tenants().await { Ok(tenants) => tenants, @@ -281,9 +279,18 @@ async fn calculate_synthetic_size_worker( } crate::tenant::tasks::warn_when_period_overrun( - tick_at.elapsed(), + started_at.elapsed(), synthetic_size_calculation_interval, BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker, ); + + let res = tokio::time::timeout_at( + started_at + synthetic_size_calculation_interval, + task_mgr::shutdown_token().cancelled(), + ) + .await; + if res.is_ok() { + return Ok(()); + } } }