diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2362f19068..c3103917ee 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3461,9 +3461,8 @@ impl Tenant { // Run each timeline's flush in a task holding the timeline's gate: this // means that if this function's future is cancelled, the Timeline shutdown // will still wait for any I/O in here to complete. - let gate = match timeline.gate.enter() { - Ok(g) => g, - Err(_) => continue, + let Ok(gate) = timeline.gate.enter() else { + continue; }; let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await }); results.push(jh); diff --git a/pageserver/src/tenant/secondary/heatmap_uploader.rs b/pageserver/src/tenant/secondary/heatmap_uploader.rs index 660459a733..147cf683ba 100644 --- a/pageserver/src/tenant/secondary/heatmap_uploader.rs +++ b/pageserver/src/tenant/secondary/heatmap_uploader.rs @@ -373,12 +373,9 @@ async fn upload_tenant_heatmap( // Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise // when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind // in remote storage. - let _guard = match tenant.gate.enter() { - Ok(g) => g, - Err(_) => { - tracing::info!("Skipping heatmap upload for tenant which is shutting down"); - return Err(UploadHeatmapError::Cancelled); - } + let Ok(_guard) = tenant.gate.enter() else { + tracing::info!("Skipping heatmap upload for tenant which is shutting down"); + return Err(UploadHeatmapError::Cancelled); }; for (timeline_id, timeline) in timelines { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b14eafa194..d13d4dc7d4 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -33,7 +33,10 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::{bin_ser::BeSer, sync::gate::Gate}; +use utils::{ + bin_ser::BeSer, + sync::gate::{Gate, GateGuard}, +}; use std::ops::{Deref, Range}; use std::pin::pin; @@ -2288,14 +2291,17 @@ impl Timeline { // accurate relation sizes, and they do not emit consumption metrics. debug_assert!(self.tenant_shard_id.is_zero()); - let _guard = self.gate.enter(); + let guard = self + .gate + .enter() + .map_err(|_| CalculateLogicalSizeError::Cancelled)?; let self_calculation = Arc::clone(self); let mut calculation = pin!(async { let ctx = ctx.attached_child(); self_calculation - .calculate_logical_size(lsn, cause, &ctx) + .calculate_logical_size(lsn, cause, &guard, &ctx) .await }); @@ -2324,33 +2330,16 @@ impl Timeline { &self, up_to_lsn: Lsn, cause: LogicalSizeCalculationCause, + _guard: &GateGuard, ctx: &RequestContext, ) -> Result { info!( "Calculating logical size for timeline {} at {}", self.timeline_id, up_to_lsn ); - // These failpoints are used by python tests to ensure that we don't delete - // the timeline while the logical size computation is ongoing. - // The first failpoint is used to make this function pause. - // Then the python test initiates timeline delete operation in a thread. - // It waits for a few seconds, then arms the second failpoint and disables - // the first failpoint. The second failpoint prints an error if the timeline - // delete code has deleted the on-disk state while we're still running here. - // It shouldn't do that. If it does it anyway, the error will be caught - // by the test suite, highlighting the problem. - fail::fail_point!("timeline-calculate-logical-size-pause"); - fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| { - if !self - .conf - .timeline_path(&self.tenant_shard_id, &self.timeline_id) - .exists() - { - error!("timeline-calculate-logical-size-pre metadata file does not exist") - } - // need to return something - Ok(0) - }); + + pausable_failpoint!("timeline-calculate-logical-size-pause"); + // See if we've already done the work for initial size calculation. // This is a short-cut for timelines that are mostly unused. if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) { diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 127e351c14..008f9482c4 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -34,7 +34,7 @@ use crate::{ }, }; -use utils::completion; +use utils::{completion, sync::gate::GateGuard}; use super::Timeline; @@ -81,6 +81,12 @@ impl Timeline { #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))] async fn eviction_task(self: Arc, cancel: CancellationToken) { use crate::tenant::tasks::random_init_delay; + + // acquire the gate guard only once within a useful span + let Ok(guard) = self.gate.enter() else { + return; + }; + { let policy = self.get_eviction_policy(); let period = match policy { @@ -96,7 +102,9 @@ impl Timeline { let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn); loop { let policy = self.get_eviction_policy(); - let cf = self.eviction_iteration(&policy, &cancel, &ctx).await; + let cf = self + .eviction_iteration(&policy, &cancel, &guard, &ctx) + .await; match cf { ControlFlow::Break(()) => break, @@ -117,6 +125,7 @@ impl Timeline { self: &Arc, policy: &EvictionPolicy, cancel: &CancellationToken, + gate: &GateGuard, ctx: &RequestContext, ) -> ControlFlow<(), Instant> { debug!("eviction iteration: {policy:?}"); @@ -127,14 +136,17 @@ impl Timeline { return ControlFlow::Continue(Instant::now() + Duration::from_secs(10)); } EvictionPolicy::LayerAccessThreshold(p) => { - match self.eviction_iteration_threshold(p, cancel, ctx).await { + match self + .eviction_iteration_threshold(p, cancel, gate, ctx) + .await + { ControlFlow::Break(()) => return ControlFlow::Break(()), ControlFlow::Continue(()) => (), } (p.period, p.threshold) } EvictionPolicy::OnlyImitiate(p) => { - if self.imitiate_only(p, cancel, ctx).await.is_break() { + if self.imitiate_only(p, cancel, gate, ctx).await.is_break() { return ControlFlow::Break(()); } (p.period, p.threshold) @@ -165,6 +177,7 @@ impl Timeline { self: &Arc, p: &EvictionPolicyLayerAccessThreshold, cancel: &CancellationToken, + gate: &GateGuard, ctx: &RequestContext, ) -> ControlFlow<()> { let now = SystemTime::now(); @@ -180,7 +193,7 @@ impl Timeline { _ = self.cancel.cancelled() => return ControlFlow::Break(()), }; - match self.imitate_layer_accesses(p, cancel, ctx).await { + match self.imitate_layer_accesses(p, cancel, gate, ctx).await { ControlFlow::Break(()) => return ControlFlow::Break(()), ControlFlow::Continue(()) => (), } @@ -302,6 +315,7 @@ impl Timeline { self: &Arc, p: &EvictionPolicyLayerAccessThreshold, cancel: &CancellationToken, + gate: &GateGuard, ctx: &RequestContext, ) -> ControlFlow<()> { let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit( @@ -315,7 +329,7 @@ impl Timeline { _ = self.cancel.cancelled() => return ControlFlow::Break(()), }; - self.imitate_layer_accesses(p, cancel, ctx).await + self.imitate_layer_accesses(p, cancel, gate, ctx).await } /// If we evict layers but keep cached values derived from those layers, then @@ -347,6 +361,7 @@ impl Timeline { &self, p: &EvictionPolicyLayerAccessThreshold, cancel: &CancellationToken, + gate: &GateGuard, ctx: &RequestContext, ) -> ControlFlow<()> { if !self.tenant_shard_id.is_zero() { @@ -365,7 +380,7 @@ impl Timeline { match state.last_layer_access_imitation { Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ } _ => { - self.imitate_timeline_cached_layer_accesses(ctx).await; + self.imitate_timeline_cached_layer_accesses(gate, ctx).await; state.last_layer_access_imitation = Some(tokio::time::Instant::now()) } } @@ -405,12 +420,21 @@ impl Timeline { /// Recompute the values which would cause on-demand downloads during restart. #[instrument(skip_all)] - async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) { + async fn imitate_timeline_cached_layer_accesses( + &self, + guard: &GateGuard, + ctx: &RequestContext, + ) { let lsn = self.get_last_record_lsn(); // imitiate on-restart initial logical size let size = self - .calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx) + .calculate_logical_size( + lsn, + LogicalSizeCalculationCause::EvictionTaskImitation, + guard, + ctx, + ) .instrument(info_span!("calculate_logical_size")) .await; diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 0788c49c7b..327e5abe26 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -1,8 +1,6 @@ import concurrent.futures import math -import queue import random -import threading import time from contextlib import closing from pathlib import Path @@ -20,7 +18,6 @@ from fixtures.neon_fixtures import ( VanillaPostgres, wait_for_last_flush_lsn, ) -from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import ( assert_tenant_state, timeline_delete_wait_completed, @@ -331,41 +328,18 @@ def test_timeline_initial_logical_size_calculation_cancellation( assert_size_calculation_not_done() log.info( - f"try to delete the timeline using {deletion_method}, this should cancel size computation tasks and wait for them to finish" + f"delete the timeline using {deletion_method}, this should cancel size computation tasks and wait for them to finish" ) - delete_timeline_success: queue.Queue[bool] = queue.Queue(maxsize=1) - def delete_timeline_thread_fn(): - try: - if deletion_method == "tenant_detach": - client.tenant_detach(tenant_id) - elif deletion_method == "timeline_delete": - timeline_delete_wait_completed(client, tenant_id, timeline_id) - delete_timeline_success.put(True) - except PageserverApiException: - delete_timeline_success.put(False) - raise + if deletion_method == "tenant_detach": + client.tenant_detach(tenant_id) + elif deletion_method == "timeline_delete": + timeline_delete_wait_completed(client, tenant_id, timeline_id) + else: + raise RuntimeError(deletion_method) - delete_timeline_thread = threading.Thread(target=delete_timeline_thread_fn) - delete_timeline_thread.start() - # give it some time to settle in the state where it waits for size computation task - time.sleep(5) - if not delete_timeline_success.empty(): - raise AssertionError( - f"test is broken, the {deletion_method} should be stuck waiting for size computation task, got result {delete_timeline_success.get()}" - ) - - log.info( - "resume the size calculation. The failpoint checks that the timeline directory still exists." - ) - client.configure_failpoints(("timeline-calculate-logical-size-check-dir-exists", "return")) - client.configure_failpoints(("timeline-calculate-logical-size-pause", "off")) - - log.info("wait for delete timeline thread to finish and assert that it succeeded") - assert delete_timeline_success.get() - - # if the implementation is incorrect, the teardown would complain about an error log - # message emitted by the code behind failpoint "timeline-calculate-logical-size-check-dir-exists" + # timeline-calculate-logical-size-pause is still paused, but it doesn't + # matter because it's a pausable_failpoint, which can be cancelled by drop. def test_timeline_physical_size_init(neon_env_builder: NeonEnvBuilder):