mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
the fix: shutdown method for WalRedoManager, so the Arc<> can outlive the process
This commit is contained in:
@@ -325,6 +325,17 @@ impl From<harness::TestRedoManager> for WalRedoManager {
|
||||
}
|
||||
|
||||
impl WalRedoManager {
|
||||
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
match self {
|
||||
Self::Prod(mgr) => mgr.shutdown().await,
|
||||
#[cfg(test)]
|
||||
Self::Test(mgr) => {
|
||||
// Not applicable to test redo manager
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
match self {
|
||||
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
|
||||
@@ -1880,6 +1891,10 @@ impl Tenant {
|
||||
tracing::debug!("Waiting for tasks...");
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;
|
||||
|
||||
if let Some(walredo_mgr) = self.walredo_mgr.as_ref() {
|
||||
walredo_mgr.shutdown().await;
|
||||
}
|
||||
|
||||
// Wait for any in-flight operations to complete
|
||||
self.gate.close().await;
|
||||
|
||||
|
||||
@@ -87,7 +87,13 @@ pub struct PostgresRedoManager {
|
||||
/// still be using the old redo process. But, those other tasks will most likely
|
||||
/// encounter an error as well, and errors are an unexpected condition anyway.
|
||||
/// So, probably we could get rid of the `Arc` in the future.
|
||||
redo_process: heavier_once_cell::OnceCell<Arc<process::WalRedoProcess>>,
|
||||
redo_process: heavier_once_cell::OnceCell<RedoProcessState>,
|
||||
launch_process_gate: Gate,
|
||||
}
|
||||
|
||||
enum RedoProcessState {
|
||||
Launched(Arc<process::WalRedoProcess>),
|
||||
ManagerShutDown,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -185,10 +191,10 @@ impl PostgresRedoManager {
|
||||
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
|
||||
})
|
||||
},
|
||||
process: self
|
||||
.redo_process
|
||||
.get()
|
||||
.map(|p| WalRedoManagerProcessStatus { pid: p.id() }),
|
||||
process: self.redo_process.get().and_then(|p| match &*p {
|
||||
RedoProcessState::Launched(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
|
||||
RedoProcessState::ManagerShutDown => None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -207,9 +213,27 @@ impl PostgresRedoManager {
|
||||
tenant_shard_id,
|
||||
last_redo_at: std::sync::Mutex::default(),
|
||||
redo_process: heavier_once_cell::OnceCell::default(),
|
||||
launch_process_gate: Gate::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
// prevent new launches
|
||||
let permit = match self.redo_process.get_or_init_detached().await {
|
||||
Ok(guard) => {
|
||||
let (proc, permit) = guard.take_and_deinit();
|
||||
drop(proc); // this just drops the Arc, its refcount may not be zero yet
|
||||
permit
|
||||
}
|
||||
Err(permit) => permit,
|
||||
};
|
||||
self.redo_process
|
||||
.set(RedoProcessState::ManagerShutDown, permit);
|
||||
|
||||
// wait for all WalRedoProcess objects to get dropped
|
||||
self.launch_process_gate.close().await;
|
||||
}
|
||||
|
||||
/// This type doesn't have its own background task to check for idleness: we
|
||||
/// rely on our owner calling this function periodically in its own housekeeping
|
||||
/// loops.
|
||||
@@ -249,7 +273,12 @@ impl PostgresRedoManager {
|
||||
loop {
|
||||
let proc: Arc<process::WalRedoProcess> =
|
||||
match self.redo_process.get_or_init_detached().await {
|
||||
Ok(guard) => Arc::clone(&guard),
|
||||
Ok(guard) => match &*guard {
|
||||
RedoProcessState::Launched(proc) => Arc::clone(proc),
|
||||
RedoProcessState::ManagerShutDown => {
|
||||
return Err(Error::Cancelled);
|
||||
}
|
||||
},
|
||||
Err(permit) => {
|
||||
// don't hold poison_guard, the launch code can bail
|
||||
let start = Instant::now();
|
||||
@@ -273,7 +302,8 @@ impl PostgresRedoManager {
|
||||
pid = proc.id(),
|
||||
"launched walredo process"
|
||||
);
|
||||
self.redo_process.set(Arc::clone(&proc), permit);
|
||||
self.redo_process
|
||||
.set(RedoProcessState::Launched(Arc::clone(&proc)), permit);
|
||||
proc
|
||||
}
|
||||
};
|
||||
@@ -348,12 +378,17 @@ impl PostgresRedoManager {
|
||||
match self.redo_process.get() {
|
||||
None => (),
|
||||
Some(guard) => {
|
||||
if Arc::ptr_eq(&proc, &*guard) {
|
||||
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
|
||||
guard.take_and_deinit();
|
||||
} else {
|
||||
// Another task already spawned another redo process (further up in this method)
|
||||
// and put it into `redo_process`. Do nothing, our view of the world is behind.
|
||||
match &*guard {
|
||||
RedoProcessState::ManagerShutDown => {}
|
||||
RedoProcessState::Launched(guard_proc) => {
|
||||
if Arc::ptr_eq(&proc, &*guard_proc) {
|
||||
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
|
||||
guard.take_and_deinit();
|
||||
} else {
|
||||
// Another task already spawned another redo process (further up in this method)
|
||||
// and put it into `redo_process`. Do nothing, our view of the world is behind.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user