From cf75eb7d8669b63cb2d7c53fdddf93ecef96f357 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 16 Jan 2025 16:46:49 +0100 Subject: [PATCH] Revert "hacky experiment: what if we had more walredo procs => doesn't move the needle on throughput" This reverts commit 9fffe6e60d700a65caf4978a333a44f3df7ea44f. --- pageserver/src/walredo.rs | 57 +++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 32 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 820d18455d..027a6eb7d7 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -78,7 +78,7 @@ pub struct PostgresRedoManager { /// # Shutdown /// /// See [`Self::launched_processes`]. - redo_process: [heavier_once_cell::OnceCell; 4], + redo_process: heavier_once_cell::OnceCell, /// Gate that is entered when launching a walredo process and held open /// until the process has been `kill()`ed and `wait()`ed upon. @@ -231,7 +231,7 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - process: self.redo_process[0].get().and_then(|p| match &*p { + process: self.redo_process.get().and_then(|p| match &*p { ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }), ProcessOnceCell::ManagerShutDown => None, }), @@ -252,7 +252,7 @@ impl PostgresRedoManager { tenant_shard_id, conf, last_redo_at: std::sync::Mutex::default(), - redo_process: Default::default(), + redo_process: heavier_once_cell::OnceCell::default(), launched_processes: utils::sync::gate::Gate::default(), } } @@ -273,27 +273,25 @@ impl PostgresRedoManager { /// This method is cancellation-safe. pub async fn shutdown(&self) -> bool { // prevent new processes from being spawned - let mut it_was_us = false; - for proc in &self.redo_process { - let maybe_permit = match proc.get_or_init_detached().await { - Ok(guard) => { - if matches!(&*guard, ProcessOnceCell::ManagerShutDown) { - None - } else { - let (proc, permit) = guard.take_and_deinit(); - drop(proc); // this just drops the Arc, its refcount may not be zero yet - Some(permit) - } + let maybe_permit = match self.redo_process.get_or_init_detached().await { + Ok(guard) => { + if matches!(&*guard, ProcessOnceCell::ManagerShutDown) { + None + } else { + let (proc, permit) = guard.take_and_deinit(); + drop(proc); // this just drops the Arc, its refcount may not be zero yet + Some(permit) } - Err(permit) => Some(permit), - }; - it_was_us |= if let Some(permit) = maybe_permit { - proc.set(ProcessOnceCell::ManagerShutDown, permit); - true - } else { - false - }; - } + } + Err(permit) => Some(permit), + }; + let it_was_us = if let Some(permit) = maybe_permit { + self.redo_process + .set(ProcessOnceCell::ManagerShutDown, permit); + true + } else { + false + }; // wait for ongoing requests to drain and the refcounts of all Arc that // we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s // for the underlying process. @@ -309,9 +307,7 @@ impl PostgresRedoManager { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - for proc in &self.redo_process { - drop(proc.get().map(|guard| guard.take_and_deinit())); - } + drop(self.redo_process.get().map(|guard| guard.take_and_deinit())); } } } @@ -329,10 +325,7 @@ impl PostgresRedoManager { pg_version: u32, closure: F, ) -> Result { - static NEXT_PROC: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); - let proc_idx = - NEXT_PROC.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.redo_process.len(); - let proc: Arc = match self.redo_process[proc_idx].get_or_init_detached().await { + let proc: Arc = match self.redo_process.get_or_init_detached().await { Ok(guard) => match &*guard { ProcessOnceCell::Spawned(proc) => Arc::clone(proc), ProcessOnceCell::ManagerShutDown => { @@ -365,7 +358,7 @@ impl PostgresRedoManager { pid = proc.id(), "launched walredo process" ); - self.redo_process[proc_idx] + self.redo_process .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit); proc } @@ -390,7 +383,7 @@ impl PostgresRedoManager { // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here, // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads. // This probably needs revisiting at some later point. - match self.redo_process[proc_idx].get() { + match self.redo_process.get() { None => (), Some(guard) => { match &*guard {