diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 3f592f167e..3316627540 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -224,21 +224,8 @@ async fn safe_rename_tenant_dir(path: impl AsRef) -> std::io::Result>); -enum BackgroundPurgesInner { - Open(tokio::task::JoinSet<()>), - // we use the async mutex for coalescing - ShuttingDown(Arc>>), -} - -impl Default for BackgroundPurges { - fn default() -> Self { - Self(Arc::new(std::sync::Mutex::new( - BackgroundPurgesInner::Open(JoinSet::new()), - ))) - } -} +#[derive(Clone, Default)] +pub struct BackgroundPurges(tokio_util::task::TaskTracker); impl BackgroundPurges { /// When we have moved a tenant's content to a temporary directory, we may delete it lazily in @@ -247,24 +234,32 @@ impl BackgroundPurges { /// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory. /// Thus the [`BackgroundPurges`] type to keep track of these tasks. pub fn spawn(&self, tmp_path: Utf8PathBuf) { - let mut guard = self.0.lock().unwrap(); - let jset = match &mut *guard { - BackgroundPurgesInner::Open(ref mut jset) => jset, - BackgroundPurgesInner::ShuttingDown(_) => { - warn!("trying to spawn background purge during shutdown, ignoring"); - return; + // because on shutdown we close and wait, we are misusing TaskTracker a bit. + // + // so first acquire a token, then check if the tracker has been closed. the tracker might get closed + // right after, but at least the shutdown will wait for what we are spawning next. + let token = self.0.token(); + + if self.0.is_closed() { + warn!( + %tmp_path, + "trying to spawn background purge during shutdown, ignoring" + ); + return; + } + + let span = info_span!(parent: None, "background_purge", %tmp_path); + + let task = move || { + let _token = token; + let _entered = span.entered(); + if let Err(error) = std::fs::remove_dir_all(tmp_path.as_path()) { + // should we fatal_io_error here? + warn!(%error, "failed to purge tenant directory"); } }; - jset.spawn_on( - async move { - if let Err(error) = fs::remove_dir_all(tmp_path.as_path()).await { - // should we fatal_io_error here? - warn!(%error, path=%tmp_path, "failed to purge tenant directory"); - } - } - .instrument(info_span!(parent: None, "background_purge")), - BACKGROUND_RUNTIME.handle(), - ); + + BACKGROUND_RUNTIME.spawn_blocking(task); } /// When this future completes, all background purges have completed. @@ -278,42 +273,9 @@ impl BackgroundPurges { /// instances of this future will continue to be correct. #[instrument(skip_all)] pub async fn shutdown(&self) { - let jset = { - let mut guard = self.0.lock().unwrap(); - match &mut *guard { - BackgroundPurgesInner::Open(jset) => { - *guard = BackgroundPurgesInner::ShuttingDown(Arc::new(tokio::sync::Mutex::new( - std::mem::take(jset), - ))) - } - BackgroundPurgesInner::ShuttingDown(_) => { - // calling shutdown multiple times is most likely a bug in pageserver shutdown code - warn!("already shutting down"); - } - }; - match &mut *guard { - BackgroundPurgesInner::ShuttingDown(ref mut jset) => jset.clone(), - BackgroundPurgesInner::Open(_) => { - unreachable!("above code transitions into shut down state"); - } - } - }; - let mut jset = jset.lock().await; // concurrent callers coalesce here - while let Some(res) = jset.join_next().await { - match res { - Ok(()) => {} - Err(e) if e.is_panic() => { - // If it panicked, the error is already logged by the panic hook. - } - Err(e) if e.is_cancelled() => { - unreachable!("we don't cancel the joinset or runtime") - } - Err(e) => { - // No idea when this can happen, but let's log it. - warn!(%e, "background purge task failed or panicked"); - } - } - } + // forbid new tasks (can be called many times) + self.0.close(); + self.0.wait().await; } } diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index c01b3a2e89..dadf5ca672 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -128,6 +128,8 @@ def test_tenant_delete_smoke( assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1 assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "inprogress"}) == 0 + env.pageserver.stop() + def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonEnvBuilder): """Reproduction of 2023-11-23 stuck tenants investigation""" @@ -200,11 +202,10 @@ def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonE if deletion is not None: deletion.join() + env.pageserver.stop() -def test_tenant_delete_races_timeline_creation( - neon_env_builder: NeonEnvBuilder, - pg_bin: PgBin, -): + +def test_tenant_delete_races_timeline_creation(neon_env_builder: NeonEnvBuilder): """ Validate that timeline creation executed in parallel with deletion works correctly. @@ -318,6 +319,8 @@ def test_tenant_delete_races_timeline_creation( # We deleted our only tenant, and the scrubber fails if it detects nothing neon_env_builder.disable_scrub_on_exit() + env.pageserver.stop() + def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder): """