walredo process per pg_version handling

This commit is contained in:
Joonas Koivunen
2024-09-12 13:31:13 +00:00
parent 7b6a888c24
commit 31ca007fb3

View File

@@ -80,6 +80,8 @@ pub struct PostgresRedoManager {
/// See [`Self::launched_processes`].
redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
processes: [heavier_once_cell::OnceCell<ProcessOnceCell>; 4],
/// Gate that is entered when launching a walredo process and held open
/// until the process has been `kill()`ed and `wait()`ed upon.
///
@@ -215,10 +217,18 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
process: self.redo_process.get().and_then(|p| match &*p {
ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
ProcessOnceCell::ManagerShutDown => None,
}),
process: self
.processes
.iter()
.filter_map(|p| {
p.get().and_then(|p| match &*p {
ProcessOnceCell::Spawned(p) => {
Some(WalRedoManagerProcessStatus { pid: p.id() })
}
ProcessOnceCell::ManagerShutDown => None,
})
})
.next(),
}
}
}
@@ -237,6 +247,7 @@ impl PostgresRedoManager {
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: heavier_once_cell::OnceCell::default(),
processes: Default::default(),
launched_processes: utils::sync::gate::Gate::default(),
}
}
@@ -256,26 +267,31 @@ impl PostgresRedoManager {
///
/// This method is cancellation-safe.
pub async fn shutdown(&self) -> bool {
// prevent new processes from being spawned
let maybe_permit = match self.redo_process.get_or_init_detached().await {
Ok(guard) => {
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
None
} else {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
Some(permit)
let mut it_was_us = false;
for process in self.processes.iter() {
// prevent new processes from being spawned
let maybe_permit = match self.redo_process.get_or_init_detached().await {
Ok(guard) => {
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
None
} else {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
Some(permit)
}
}
}
Err(permit) => Some(permit),
};
let it_was_us = if let Some(permit) = maybe_permit {
self.redo_process
.set(ProcessOnceCell::ManagerShutDown, permit);
true
} else {
false
};
Err(permit) => Some(permit),
};
let i_cant_see_why_this = if let Some(permit) = maybe_permit {
process.set(ProcessOnceCell::ManagerShutDown, permit);
true
} else {
false
};
// TODO: or is correct?
it_was_us |= i_cant_see_why_this;
}
// wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
// we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
// for the underlying process.
@@ -291,7 +307,10 @@ impl PostgresRedoManager {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
self.processes.iter().for_each(|c| {
drop(c.get().map(|guard| guard.take_and_deinit()));
})
}
}
}
@@ -314,13 +333,23 @@ impl PostgresRedoManager {
wal_redo_timeout: Duration,
pg_version: u32,
) -> Result<Bytes, Error> {
assert!(
(14..=17).contains(&pg_version),
"this should be an enum, but no: {pg_version}"
);
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
let (rel, blknum) = key.to_rel_block().context("invalid record")?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let mut n_attempts = 0u32;
loop {
let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
// handling multiple processes idea: just support N versions here, but the caller
// splits per parent_lsn in the case that:
// - reconstruct_data spans two versions
// - reconstruct_data went to parent???
let process = &self.processes[(pg_version - 14) as usize];
let proc: Arc<Process> = match process.get_or_init_detached().await {
Ok(guard) => match &*guard {
ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
ProcessOnceCell::ManagerShutDown => {
@@ -353,8 +382,7 @@ impl PostgresRedoManager {
pid = proc.id(),
"launched walredo process"
);
self.redo_process
.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
process.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
proc
}
};
@@ -419,7 +447,7 @@ impl PostgresRedoManager {
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
// This probably needs revisiting at some later point.
match self.redo_process.get() {
match process.get() {
None => (),
Some(guard) => {
match &*guard {