diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 531d3b9bf1..384355b20a 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -80,6 +80,8 @@ pub struct PostgresRedoManager { /// See [`Self::launched_processes`]. redo_process: heavier_once_cell::OnceCell, + processes: [heavier_once_cell::OnceCell; 4], + /// Gate that is entered when launching a walredo process and held open /// until the process has been `kill()`ed and `wait()`ed upon. /// @@ -215,10 +217,18 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - process: self.redo_process.get().and_then(|p| match &*p { - ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }), - ProcessOnceCell::ManagerShutDown => None, - }), + process: self + .processes + .iter() + .filter_map(|p| { + p.get().and_then(|p| match &*p { + ProcessOnceCell::Spawned(p) => { + Some(WalRedoManagerProcessStatus { pid: p.id() }) + } + ProcessOnceCell::ManagerShutDown => None, + }) + }) + .next(), } } } @@ -237,6 +247,7 @@ impl PostgresRedoManager { conf, last_redo_at: std::sync::Mutex::default(), redo_process: heavier_once_cell::OnceCell::default(), + processes: Default::default(), launched_processes: utils::sync::gate::Gate::default(), } } @@ -256,26 +267,31 @@ impl PostgresRedoManager { /// /// This method is cancellation-safe. pub async fn shutdown(&self) -> bool { - // prevent new processes from being spawned - let maybe_permit = match self.redo_process.get_or_init_detached().await { - Ok(guard) => { - if matches!(&*guard, ProcessOnceCell::ManagerShutDown) { - None - } else { - let (proc, permit) = guard.take_and_deinit(); - drop(proc); // this just drops the Arc, its refcount may not be zero yet - Some(permit) + let mut it_was_us = false; + for process in self.processes.iter() { + // prevent new processes from being spawned + let maybe_permit = match self.redo_process.get_or_init_detached().await { + Ok(guard) => { + if matches!(&*guard, ProcessOnceCell::ManagerShutDown) { + None + } else { + let (proc, permit) = guard.take_and_deinit(); + drop(proc); // this just drops the Arc, its refcount may not be zero yet + Some(permit) + } } - } - Err(permit) => Some(permit), - }; - let it_was_us = if let Some(permit) = maybe_permit { - self.redo_process - .set(ProcessOnceCell::ManagerShutDown, permit); - true - } else { - false - }; + Err(permit) => Some(permit), + }; + let i_cant_see_why_this = if let Some(permit) = maybe_permit { + process.set(ProcessOnceCell::ManagerShutDown, permit); + true + } else { + false + }; + + // TODO: or is correct? + it_was_us |= i_cant_see_why_this; + } // wait for ongoing requests to drain and the refcounts of all Arc that // we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s // for the underlying process. @@ -291,7 +307,10 @@ impl PostgresRedoManager { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - drop(self.redo_process.get().map(|guard| guard.take_and_deinit())); + + self.processes.iter().for_each(|c| { + drop(c.get().map(|guard| guard.take_and_deinit())); + }) } } } @@ -314,13 +333,23 @@ impl PostgresRedoManager { wal_redo_timeout: Duration, pg_version: u32, ) -> Result { + assert!( + (14..=17).contains(&pg_version), + "this should be an enum, but no: {pg_version}" + ); *(self.last_redo_at.lock().unwrap()) = Some(Instant::now()); let (rel, blknum) = key.to_rel_block().context("invalid record")?; const MAX_RETRY_ATTEMPTS: u32 = 1; let mut n_attempts = 0u32; loop { - let proc: Arc = match self.redo_process.get_or_init_detached().await { + // handling multiple processes idea: just support N versions here, but the caller + // splits per parent_lsn in the case that: + // - reconstruct_data spans two versions + // - reconstruct_data went to parent??? + let process = &self.processes[(pg_version - 14) as usize]; + + let proc: Arc = match process.get_or_init_detached().await { Ok(guard) => match &*guard { ProcessOnceCell::Spawned(proc) => Arc::clone(proc), ProcessOnceCell::ManagerShutDown => { @@ -353,8 +382,7 @@ impl PostgresRedoManager { pid = proc.id(), "launched walredo process" ); - self.redo_process - .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit); + process.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit); proc } }; @@ -419,7 +447,7 @@ impl PostgresRedoManager { // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here, // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads. // This probably needs revisiting at some later point. - match self.redo_process.get() { + match process.get() { None => (), Some(guard) => { match &*guard {