diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index b13521e0ed..16a06e5fd1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -325,6 +325,17 @@ impl From for WalRedoManager { } impl WalRedoManager { + + pub(crate) async fn shutdown(&self) { + match self { + Self::Prod(mgr) => mgr.shutdown().await, + #[cfg(test)] + Self::Test(mgr) => { + // Not applicable to test redo manager + } + } + } + pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { match self { Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout), @@ -1880,6 +1891,10 @@ impl Tenant { tracing::debug!("Waiting for tasks..."); task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await; + if let Some(walredo_mgr) = self.walredo_mgr.as_ref() { + walredo_mgr.shutdown().await; + } + // Wait for any in-flight operations to complete self.gate.close().await; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e9782e64b1..e4a6684db4 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -87,7 +87,13 @@ pub struct PostgresRedoManager { /// still be using the old redo process. But, those other tasks will most likely /// encounter an error as well, and errors are an unexpected condition anyway. /// So, probably we could get rid of the `Arc` in the future. - redo_process: heavier_once_cell::OnceCell>, + redo_process: heavier_once_cell::OnceCell, + launch_process_gate: Gate, +} + +enum RedoProcessState { + Launched(Arc), + ManagerShutDown, } #[derive(Debug, thiserror::Error)] @@ -185,10 +191,10 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - process: self - .redo_process - .get() - .map(|p| WalRedoManagerProcessStatus { pid: p.id() }), + process: self.redo_process.get().and_then(|p| match &*p { + RedoProcessState::Launched(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }), + RedoProcessState::ManagerShutDown => None, + }), } } } @@ -207,9 +213,27 @@ impl PostgresRedoManager { tenant_shard_id, last_redo_at: std::sync::Mutex::default(), redo_process: heavier_once_cell::OnceCell::default(), + launch_process_gate: Gate::default(), } } + pub async fn shutdown(&self) { + // prevent new launches + let permit = match self.redo_process.get_or_init_detached().await { + Ok(guard) => { + let (proc, permit) = guard.take_and_deinit(); + drop(proc); // this just drops the Arc, its refcount may not be zero yet + permit + } + Err(permit) => permit, + }; + self.redo_process + .set(RedoProcessState::ManagerShutDown, permit); + + // wait for all WalRedoProcess objects to get dropped + self.launch_process_gate.close().await; + } + /// This type doesn't have its own background task to check for idleness: we /// rely on our owner calling this function periodically in its own housekeeping /// loops. @@ -249,7 +273,12 @@ impl PostgresRedoManager { loop { let proc: Arc = match self.redo_process.get_or_init_detached().await { - Ok(guard) => Arc::clone(&guard), + Ok(guard) => match &*guard { + RedoProcessState::Launched(proc) => Arc::clone(proc), + RedoProcessState::ManagerShutDown => { + return Err(Error::Cancelled); + } + }, Err(permit) => { // don't hold poison_guard, the launch code can bail let start = Instant::now(); @@ -273,7 +302,8 @@ impl PostgresRedoManager { pid = proc.id(), "launched walredo process" ); - self.redo_process.set(Arc::clone(&proc), permit); + self.redo_process + .set(RedoProcessState::Launched(Arc::clone(&proc)), permit); proc } }; @@ -348,12 +378,17 @@ impl PostgresRedoManager { match self.redo_process.get() { None => (), Some(guard) => { - if Arc::ptr_eq(&proc, &*guard) { - // We're the first to observe an error from `proc`, it's our job to take it out of rotation. - guard.take_and_deinit(); - } else { - // Another task already spawned another redo process (further up in this method) - // and put it into `redo_process`. Do nothing, our view of the world is behind. + match &*guard { + RedoProcessState::ManagerShutDown => {} + RedoProcessState::Launched(guard_proc) => { + if Arc::ptr_eq(&proc, &*guard_proc) { + // We're the first to observe an error from `proc`, it's our job to take it out of rotation. + guard.take_and_deinit(); + } else { + // Another task already spawned another redo process (further up in this method) + // and put it into `redo_process`. Do nothing, our view of the world is behind. + } + } } } }