diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index f971b0a88d..542c1b7b30 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -625,6 +625,7 @@ fn start_pageserver( conf.synthetic_size_calculation_interval, conf.id, local_disk_storage, + cancel, metrics_ctx, ) .instrument(info_span!("metrics_collection")) diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 9e8377c1f1..7ad6a0f890 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -3,7 +3,7 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::tasks::BackgroundLoopKind; -use crate::tenant::{mgr, LogicalSizeCalculationCause}; +use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError}; use camino::Utf8PathBuf; use consumption_metrics::EventType; use pageserver_api::models::TenantState; @@ -12,6 +12,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime}; use tokio::time::Instant; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::NodeId; @@ -37,6 +38,7 @@ type RawMetric = (MetricsKey, (EventType, u64)); type Cache = HashMap; /// Main thread that serves metrics collection +#[allow(clippy::too_many_arguments)] pub async fn collect_metrics( metric_collection_endpoint: &Url, metric_collection_interval: Duration, @@ -44,6 +46,7 @@ pub async fn collect_metrics( synthetic_size_calculation_interval: Duration, node_id: NodeId, local_disk_storage: Utf8PathBuf, + cancel: CancellationToken, ctx: RequestContext, ) -> anyhow::Result<()> { if _cached_metric_collection_interval != Duration::ZERO { @@ -63,9 +66,13 @@ pub async fn collect_metrics( "synthetic size calculation", false, async move { - calculate_synthetic_size_worker(synthetic_size_calculation_interval, &worker_ctx) - .instrument(info_span!("synthetic_size_worker")) - .await?; + calculate_synthetic_size_worker( + synthetic_size_calculation_interval, + &cancel, + &worker_ctx, + ) + .instrument(info_span!("synthetic_size_worker")) + .await?; Ok(()) }, ); @@ -241,6 +248,7 @@ async fn reschedule( /// Caclculate synthetic size for each active tenant async fn calculate_synthetic_size_worker( synthetic_size_calculation_interval: Duration, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result<()> { info!("starting calculate_synthetic_size_worker"); @@ -272,7 +280,12 @@ async fn calculate_synthetic_size_worker( // Same for the loop that fetches computed metrics. // By using the same limiter, we centralize metrics collection for "start" and "finished" counters, // which turns out is really handy to understand the system. - if let Err(e) = tenant.calculate_synthetic_size(cause, ctx).await { + if let Err(e) = tenant.calculate_synthetic_size(cause, cancel, ctx).await { + if let Some(PageReconstructError::Cancelled) = + e.downcast_ref::() + { + return Ok(()); + } error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}"); } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 21fd4d786a..8bc652acef 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -550,7 +550,7 @@ async fn timeline_detail_handler( async fn get_lsn_by_timestamp_handler( request: Request, - _cancel: CancellationToken, + cancel: CancellationToken, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; @@ -566,7 +566,9 @@ async fn get_lsn_by_timestamp_handler( let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; - let result = timeline.find_lsn_for_timestamp(timestamp_pg, &ctx).await?; + let result = timeline + .find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx) + .await?; if version.unwrap_or(0) > 1 { #[derive(serde::Serialize)] @@ -842,7 +844,7 @@ async fn tenant_delete_handler( /// without modifying anything anyway. async fn tenant_size_handler( request: Request, - _cancel: CancellationToken, + cancel: CancellationToken, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; @@ -858,6 +860,7 @@ async fn tenant_size_handler( .gather_size_inputs( retention_period, LogicalSizeCalculationCause::TenantSizeHandler, + &cancel, &ctx, ) .await @@ -1242,7 +1245,7 @@ async fn failpoints_handler( // Run GC immediately on given timeline. async fn timeline_gc_handler( mut request: Request, - _cancel: CancellationToken, + cancel: CancellationToken, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; @@ -1251,7 +1254,7 @@ async fn timeline_gc_handler( let gc_req: TimelineGcRequest = json_request(&mut request).await?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req, &ctx).await?; + let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req, cancel, &ctx).await?; let gc_result = wait_task_done .await .context("wait for gc task") diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 827278af72..15d5609ceb 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -21,6 +21,7 @@ use serde::{Deserialize, Serialize}; use std::collections::{hash_map, HashMap, HashSet}; use std::ops::ControlFlow; use std::ops::Range; +use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; use utils::bin_ser::DeserializeError; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -365,6 +366,7 @@ impl Timeline { pub async fn find_lsn_for_timestamp( &self, search_timestamp: TimestampTz, + cancel: &CancellationToken, ctx: &RequestContext, ) -> Result { let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn(); @@ -383,6 +385,9 @@ impl Timeline { let mut found_smaller = false; let mut found_larger = false; while low < high { + if cancel.is_cancelled() { + return Err(PageReconstructError::Cancelled); + } // cannot overflow, high and low are both smaller than u64::MAX / 2 let mid = (high + low) / 2; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7ec1395e05..780652f51e 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1630,6 +1630,7 @@ impl Tenant { target_timeline_id: Option, horizon: u64, pitr: Duration, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { // Don't start doing work during shutdown @@ -1652,7 +1653,7 @@ impl Tenant { } } - self.gc_iteration_internal(target_timeline_id, horizon, pitr, ctx) + self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx) .await } @@ -2569,14 +2570,30 @@ impl Tenant { target_timeline_id: Option, horizon: u64, pitr: Duration, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { let mut totals: GcResult = Default::default(); let now = Instant::now(); - let gc_timelines = self - .refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx) - .await?; + let gc_timelines = match self + .refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx) + .await + { + Ok(result) => result, + Err(e) => { + if let Some(PageReconstructError::Cancelled) = + e.downcast_ref::() + { + // Handle cancellation + totals.elapsed = now.elapsed(); + return Ok(totals); + } else { + // Propagate other errors + return Err(e); + } + } + }; crate::failpoint_support::sleep_millis_async!( "gc_iteration_internal_after_getting_gc_timelines" @@ -2600,7 +2617,7 @@ impl Tenant { // See comments in [`Tenant::branch_timeline`] for more information // about why branch creation task can run concurrently with timeline's GC iteration. for timeline in gc_timelines { - if task_mgr::is_shutdown_requested() { + if task_mgr::is_shutdown_requested() || cancel.is_cancelled() { // We were requested to shut down. Stop and return with the progress we // made. break; @@ -2620,6 +2637,7 @@ impl Tenant { /// This is usually executed as part of periodic gc, but can now be triggered more often. pub async fn refresh_gc_info( &self, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result>> { // since this method can now be called at different rates than the configured gc loop, it @@ -2631,7 +2649,7 @@ impl Tenant { // refresh all timelines let target_timeline_id = None; - self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx) + self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx) .await } @@ -2640,6 +2658,7 @@ impl Tenant { target_timeline_id: Option, horizon: u64, pitr: Duration, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result>> { // grab mutex to prevent new timelines from being created here. @@ -2713,7 +2732,7 @@ impl Tenant { .map(|&x| x.1) .collect(); timeline - .update_gc_info(branchpoints, cutoff, pitr, ctx) + .update_gc_info(branchpoints, cutoff, pitr, cancel, ctx) .await?; gc_timelines.push(timeline); @@ -3126,6 +3145,7 @@ impl Tenant { // (only if it is shorter than the real cutoff). max_retention_period: Option, cause: LogicalSizeCalculationCause, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { let logical_sizes_at_once = self @@ -3148,6 +3168,7 @@ impl Tenant { max_retention_period, &mut shared_cache, cause, + cancel, ctx, ) .await @@ -3160,9 +3181,10 @@ impl Tenant { pub async fn calculate_synthetic_size( &self, cause: LogicalSizeCalculationCause, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { - let inputs = self.gather_size_inputs(None, cause, ctx).await?; + let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?; let size = inputs.calculate()?; @@ -3934,7 +3956,13 @@ mod tests { // and compaction works. But it does set the 'cutoff' point so that the cross check // below should fail. tenant - .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx) + .gc_iteration( + Some(TIMELINE_ID), + 0x10, + Duration::ZERO, + &CancellationToken::new(), + &ctx, + ) .await?; // try to branch at lsn 25, should fail because we already garbage collected the data @@ -4037,7 +4065,13 @@ mod tests { tline.set_broken("test".to_owned()); tenant - .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx) + .gc_iteration( + Some(TIMELINE_ID), + 0x10, + Duration::ZERO, + &CancellationToken::new(), + &ctx, + ) .await?; // The branchpoints should contain all timelines, even ones marked @@ -4083,7 +4117,13 @@ mod tests { .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 tenant - .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx) + .gc_iteration( + Some(TIMELINE_ID), + 0x10, + Duration::ZERO, + &CancellationToken::new(), + &ctx, + ) .await?; assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok()); @@ -4111,7 +4151,13 @@ mod tests { // run gc on parent tenant - .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx) + .gc_iteration( + Some(TIMELINE_ID), + 0x10, + Duration::ZERO, + &CancellationToken::new(), + &ctx, + ) .await?; // Check that the data is still accessible on the branch. @@ -4421,7 +4467,13 @@ mod tests { let cutoff = tline.get_last_record_lsn(); tline - .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx) + .update_gc_info( + Vec::new(), + cutoff, + Duration::ZERO, + &CancellationToken::new(), + &ctx, + ) .await?; tline.freeze_and_flush().await?; tline @@ -4503,7 +4555,13 @@ mod tests { // Perform a cycle of flush, compact, and GC let cutoff = tline.get_last_record_lsn(); tline - .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx) + .update_gc_info( + Vec::new(), + cutoff, + Duration::ZERO, + &CancellationToken::new(), + &ctx, + ) .await?; tline.freeze_and_flush().await?; tline @@ -4595,7 +4653,13 @@ mod tests { // Perform a cycle of flush, compact, and GC let cutoff = tline.get_last_record_lsn(); tline - .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx) + .update_gc_info( + Vec::new(), + cutoff, + Duration::ZERO, + &CancellationToken::new(), + &ctx, + ) .await?; tline.freeze_and_flush().await?; tline diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a766cca0c5..3ff7425bc2 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1944,6 +1944,7 @@ pub(crate) async fn immediate_gc( tenant_id: TenantId, timeline_id: TimelineId, gc_req: TimelineGcRequest, + cancel: CancellationToken, ctx: &RequestContext, ) -> Result>, ApiError> { let guard = TENANTS.read().unwrap(); @@ -1970,7 +1971,7 @@ pub(crate) async fn immediate_gc( async move { fail::fail_point!("immediate_gc_task_pre"); let result = tenant - .gc_iteration(Some(timeline_id), gc_horizon, pitr, &ctx) + .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx) .instrument(info_span!("manual_gc", %tenant_id, %timeline_id)) .await; // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index a85dc9231c..e0b1652d98 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use anyhow::{bail, Context}; use tokio::sync::oneshot::error::RecvError; use tokio::sync::Semaphore; +use tokio_util::sync::CancellationToken; use crate::context::RequestContext; use crate::pgdatadir_mapping::CalculateLogicalSizeError; @@ -113,11 +114,12 @@ pub(super) async fn gather_inputs( max_retention_period: Option, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, cause: LogicalSizeCalculationCause, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { // refresh is needed to update gc related pitr_cutoff and horizon_cutoff tenant - .refresh_gc_info(ctx) + .refresh_gc_info(cancel, ctx) .await .context("Failed to refresh gc_info before gathering inputs")?; diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 27dff9f54a..860bb255ca 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -261,7 +261,7 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { } else { // Run gc let res = tenant - .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx) + .gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx) .await; if let Err(e) = res { let wait_duration = backoff::exponential_backoff_duration_seconds( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 763b18ccc3..38b8832281 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3726,6 +3726,7 @@ impl Timeline { retain_lsns: Vec, cutoff_horizon: Lsn, pitr: Duration, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result<()> { // First, calculate pitr_cutoff_timestamp and then convert it to LSN. @@ -3739,7 +3740,10 @@ impl Timeline { if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) { let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp); - match self.find_lsn_for_timestamp(pitr_timestamp, ctx).await? { + match self + .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx) + .await? + { LsnForTimestamp::Present(lsn) => lsn, LsnForTimestamp::Future(lsn) => { // The timestamp is in the future. That sounds impossible, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 79bc434a2a..f4a4c26c06 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -351,7 +351,7 @@ impl Timeline { match state.last_layer_access_imitation { Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ } _ => { - self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel) + self.imitate_synthetic_size_calculation_worker(&tenant, cancel, ctx) .await; state.last_layer_access_imitation = Some(tokio::time::Instant::now()); } @@ -417,8 +417,8 @@ impl Timeline { async fn imitate_synthetic_size_calculation_worker( &self, tenant: &Arc, - ctx: &RequestContext, cancel: &CancellationToken, + ctx: &RequestContext, ) { if self.conf.metric_collection_endpoint.is_none() { // We don't start the consumption metrics task if this is not set in the config. @@ -457,6 +457,7 @@ impl Timeline { None, &mut throwaway_cache, LogicalSizeCalculationCause::EvictionTaskImitation, + cancel, ctx, ) .instrument(info_span!("gather_inputs"));