hacky experiment: what if we had more walredo procs => doesn't move the needle on throughput

This commit is contained in:
Christian Schwarz
2025-01-16 13:58:23 +01:00
parent 2ff0a4ae82
commit 9fffe6e60d

View File

@@ -78,7 +78,7 @@ pub struct PostgresRedoManager {
/// # Shutdown
///
/// See [`Self::launched_processes`].
redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
redo_process: [heavier_once_cell::OnceCell<ProcessOnceCell>; 4],
/// Gate that is entered when launching a walredo process and held open
/// until the process has been `kill()`ed and `wait()`ed upon.
@@ -231,7 +231,7 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
process: self.redo_process.get().and_then(|p| match &*p {
process: self.redo_process[0].get().and_then(|p| match &*p {
ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
ProcessOnceCell::ManagerShutDown => None,
}),
@@ -252,7 +252,7 @@ impl PostgresRedoManager {
tenant_shard_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: heavier_once_cell::OnceCell::default(),
redo_process: Default::default(),
launched_processes: utils::sync::gate::Gate::default(),
}
}
@@ -273,25 +273,27 @@ impl PostgresRedoManager {
/// This method is cancellation-safe.
pub async fn shutdown(&self) -> bool {
// prevent new processes from being spawned
let maybe_permit = match self.redo_process.get_or_init_detached().await {
Ok(guard) => {
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
None
} else {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
Some(permit)
let mut it_was_us = false;
for proc in &self.redo_process {
let maybe_permit = match proc.get_or_init_detached().await {
Ok(guard) => {
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
None
} else {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
Some(permit)
}
}
}
Err(permit) => Some(permit),
};
let it_was_us = if let Some(permit) = maybe_permit {
self.redo_process
.set(ProcessOnceCell::ManagerShutDown, permit);
true
} else {
false
};
Err(permit) => Some(permit),
};
it_was_us |= if let Some(permit) = maybe_permit {
proc.set(ProcessOnceCell::ManagerShutDown, permit);
true
} else {
false
};
}
// wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
// we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
// for the underlying process.
@@ -307,7 +309,9 @@ impl PostgresRedoManager {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
for proc in &self.redo_process {
drop(proc.get().map(|guard| guard.take_and_deinit()));
}
}
}
}
@@ -325,7 +329,10 @@ impl PostgresRedoManager {
pg_version: u32,
closure: F,
) -> Result<O, Error> {
let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
static NEXT_PROC: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
let proc_idx =
NEXT_PROC.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.redo_process.len();
let proc: Arc<Process> = match self.redo_process[proc_idx].get_or_init_detached().await {
Ok(guard) => match &*guard {
ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
ProcessOnceCell::ManagerShutDown => {
@@ -358,7 +365,7 @@ impl PostgresRedoManager {
pid = proc.id(),
"launched walredo process"
);
self.redo_process
self.redo_process[proc_idx]
.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
proc
}
@@ -383,7 +390,7 @@ impl PostgresRedoManager {
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
// This probably needs revisiting at some later point.
match self.redo_process.get() {
match self.redo_process[proc_idx].get() {
None => (),
Some(guard) => {
match &*guard {