diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 262dccac7d..6809cb1da4 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5078,14 +5078,14 @@ impl Timeline { // If we have a page image, and no WAL, we're all set if data.records.is_empty() { - if let Some((img_lsn, img)) = &data.img { + if let Some((img_lsn, img)) = data.img { trace!( "found page image for key {} at {}, no WAL redo required, req LSN {}", key, img_lsn, request_lsn, ); - Ok(img.clone()) + Ok(img) } else { Err(PageReconstructError::from(anyhow!( "base image for {key} at {request_lsn} not found" @@ -5096,33 +5096,130 @@ impl Timeline { // // If we don't have a base image, then the oldest WAL record better initialize // the page - if data.img.is_none() && !data.records.first().unwrap().1.will_init() { - Err(PageReconstructError::from(anyhow!( - "Base image for {} at {} not found, but got {} WAL records", - key, - request_lsn, - data.records.len() - ))) - } else { - if data.img.is_some() { + + let have_img = data.img.is_some(); + let will_init = data + .records + .first() + .map(|(_, rec)| rec.will_init()) + .expect("already checked to have records"); + + match (have_img, will_init) { + (false, false) => { + return Err(PageReconstructError::from(anyhow!( + "Base image for {} at {} not found, but got {} WAL records", + key, + request_lsn, + data.records.len() + ))) + } + (true, _) => { trace!( "found {} WAL records and a base image for {} at {}, performing WAL redo", data.records.len(), key, request_lsn ); - } else { + } + (false, _) => { + assert!(will_init, "already checked above"); trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn); - }; - let res = self - .walredo_mgr - .as_ref() - .context("timeline has no walredo manager") - .map_err(PageReconstructError::WalRedo)? - .request_redo(key, request_lsn, data.img, data.records, self.pg_version) + } + } + + let oldest_lsn = data + .records + .first() + .map(|(lsn, _)| lsn) + .expect("again, checked"); + + // walk up the ancestry until we have found an ancestor covering the lsn range + let ancestry = std::iter::successors(Some(self), |tl| tl.ancestor_timeline.as_deref()) + // 100 - initdb R pg14 + // 150 - branch S pg14 + // 200 - branch T pg15 + // 250 - branch U pg15 + // 300 - branch V pg16 + // + // oldest_lsn = 155: + // get [V pg16, U pg15(one_more=true), T pg15(one_more=true), S pg14(one_more=false)] + .take_while({ + let mut one_more = true; + + move |tl| { + if *oldest_lsn < tl.ancestor_lsn { + assert!(one_more); + true + } else { + let prev = one_more; + one_more = false; + prev + } + } + }) + // remove consecutive same pg_versions, which might be all in case we can use the + // same timeline for all reconstruction. + // [V pg16, U pg15, T pg15, S pg14] => [V pg16, T pg15, S pg14] + .fold(Vec::<&Timeline>::with_capacity(4), |mut acc, next| { + if acc + .last() + .map(|tl| tl.pg_version == next.pg_version) + .unwrap_or(false) + { + // overwrite with an earlier timeline; additionally we only allow upgrades, + // so we cannot go backwards like pg14 (branch) pg15 (branch) pg14 + *acc.last_mut().unwrap() = next; + } else { + acc.push(next); + } + acc + }); + + // shifted for the purpose of timeline_pairs + let later_timelines = ancestry + .iter() + .rev() + .skip(1) + .map(Some) + .chain(std::iter::once(None)); + + // zip older and later timelines into pair, which we then use to select parts of + // wal records to be executed on which version walredo + let timeline_pairs = ancestry.iter().rev().zip(later_timelines); + + let mgr = self + .walredo_mgr + .as_ref() + .context("timeline has no walredo manager") + .map_err(PageReconstructError::WalRedo)?; + + let mut img = data.img.clone(); + let mut records = &data.records[..]; + + for (older, later) in timeline_pairs { + let scratch = records + .iter() + .take_while(|(lsn, _)| { + // if there is no later, take all remaining + later.map(|later| lsn < &later.ancestor_lsn).unwrap_or(true) + }) + .cloned() + .collect::>(); + records = &records[scratch.len()..]; + if later.is_none() { + assert!(records.is_empty()); + } + + // this is only used for logging on the next round + let last_lsn = scratch.last().unwrap().0; + + // is request_lsn ok? it's not used for anything important, just logging. + let res = mgr + .request_redo(key, request_lsn, img, scratch, older.pg_version) .await; - let img = match res { - Ok(img) => img, + + img = match res { + Ok(img) => Some((last_lsn, img)), Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled), Err(walredo::Error::Other(e)) => { return Err(PageReconstructError::WalRedo( @@ -5130,8 +5227,9 @@ impl Timeline { )) } }; - Ok(img) } + + Ok(img.unwrap().1) } } diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 82585f9ed8..384355b20a 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -80,6 +80,8 @@ pub struct PostgresRedoManager { /// See [`Self::launched_processes`]. redo_process: heavier_once_cell::OnceCell, + processes: [heavier_once_cell::OnceCell; 4], + /// Gate that is entered when launching a walredo process and held open /// until the process has been `kill()`ed and `wait()`ed upon. /// @@ -215,10 +217,18 @@ impl PostgresRedoManager { chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?) }) }, - process: self.redo_process.get().and_then(|p| match &*p { - ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }), - ProcessOnceCell::ManagerShutDown => None, - }), + process: self + .processes + .iter() + .filter_map(|p| { + p.get().and_then(|p| match &*p { + ProcessOnceCell::Spawned(p) => { + Some(WalRedoManagerProcessStatus { pid: p.id() }) + } + ProcessOnceCell::ManagerShutDown => None, + }) + }) + .next(), } } } @@ -237,6 +247,7 @@ impl PostgresRedoManager { conf, last_redo_at: std::sync::Mutex::default(), redo_process: heavier_once_cell::OnceCell::default(), + processes: Default::default(), launched_processes: utils::sync::gate::Gate::default(), } } @@ -256,26 +267,31 @@ impl PostgresRedoManager { /// /// This method is cancellation-safe. pub async fn shutdown(&self) -> bool { - // prevent new processes from being spawned - let maybe_permit = match self.redo_process.get_or_init_detached().await { - Ok(guard) => { - if matches!(&*guard, ProcessOnceCell::ManagerShutDown) { - None - } else { - let (proc, permit) = guard.take_and_deinit(); - drop(proc); // this just drops the Arc, its refcount may not be zero yet - Some(permit) + let mut it_was_us = false; + for process in self.processes.iter() { + // prevent new processes from being spawned + let maybe_permit = match self.redo_process.get_or_init_detached().await { + Ok(guard) => { + if matches!(&*guard, ProcessOnceCell::ManagerShutDown) { + None + } else { + let (proc, permit) = guard.take_and_deinit(); + drop(proc); // this just drops the Arc, its refcount may not be zero yet + Some(permit) + } } - } - Err(permit) => Some(permit), - }; - let it_was_us = if let Some(permit) = maybe_permit { - self.redo_process - .set(ProcessOnceCell::ManagerShutDown, permit); - true - } else { - false - }; + Err(permit) => Some(permit), + }; + let i_cant_see_why_this = if let Some(permit) = maybe_permit { + process.set(ProcessOnceCell::ManagerShutDown, permit); + true + } else { + false + }; + + // TODO: or is correct? + it_was_us |= i_cant_see_why_this; + } // wait for ongoing requests to drain and the refcounts of all Arc that // we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s // for the underlying process. @@ -291,7 +307,10 @@ impl PostgresRedoManager { if let Some(last_redo_at) = *g { if last_redo_at.elapsed() >= idle_timeout { drop(g); - drop(self.redo_process.get().map(|guard| guard.take_and_deinit())); + + self.processes.iter().for_each(|c| { + drop(c.get().map(|guard| guard.take_and_deinit())); + }) } } } @@ -314,13 +333,23 @@ impl PostgresRedoManager { wal_redo_timeout: Duration, pg_version: u32, ) -> Result { + assert!( + (14..=17).contains(&pg_version), + "this should be an enum, but no: {pg_version}" + ); *(self.last_redo_at.lock().unwrap()) = Some(Instant::now()); let (rel, blknum) = key.to_rel_block().context("invalid record")?; const MAX_RETRY_ATTEMPTS: u32 = 1; let mut n_attempts = 0u32; loop { - let proc: Arc = match self.redo_process.get_or_init_detached().await { + // handling multiple processes idea: just support N versions here, but the caller + // splits per parent_lsn in the case that: + // - reconstruct_data spans two versions + // - reconstruct_data went to parent??? + let process = &self.processes[(pg_version - 14) as usize]; + + let proc: Arc = match process.get_or_init_detached().await { Ok(guard) => match &*guard { ProcessOnceCell::Spawned(proc) => Arc::clone(proc), ProcessOnceCell::ManagerShutDown => { @@ -332,11 +361,11 @@ impl PostgresRedoManager { // acquire guard before spawning process, so that we don't spawn new processes // if the gate is already closed. let _launched_processes_guard = match self.launched_processes.enter() { - Ok(guard) => guard, - Err(GateError::GateClosed) => unreachable!( - "shutdown sets the once cell to `ManagerShutDown` state before closing the gate" - ), - }; + Ok(guard) => guard, + Err(GateError::GateClosed) => unreachable!( + "shutdown sets the once cell to `ManagerShutDown` state before closing the gate" + ), + }; let proc = Arc::new(Process { process: process::WalRedoProcess::launch( self.conf, @@ -353,8 +382,7 @@ impl PostgresRedoManager { pid = proc.id(), "launched walredo process" ); - self.redo_process - .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit); + process.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit); proc } }; @@ -419,7 +447,7 @@ impl PostgresRedoManager { // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here, // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads. // This probably needs revisiting at some later point. - match self.redo_process.get() { + match process.get() { None => (), Some(guard) => { match &*guard { @@ -448,9 +476,7 @@ impl PostgresRedoManager { } } - /// - /// Process a batch of WAL records using bespoken Neon code. - /// + /// Process a batch of WAL records using bespoke Neon code. fn apply_batch_neon( &self, key: Key, @@ -471,7 +497,7 @@ impl PostgresRedoManager { // Apply all the WAL records in the batch for (record_lsn, record) in records.iter() { - self.apply_record_neon(key, &mut page, *record_lsn, record)?; + apply_neon::apply_in_neon(record, *record_lsn, key, &mut page)?; } // Success! let duration = start_time.elapsed(); @@ -488,18 +514,6 @@ impl PostgresRedoManager { Ok(page.freeze()) } - - fn apply_record_neon( - &self, - key: Key, - page: &mut BytesMut, - record_lsn: Lsn, - record: &NeonWalRecord, - ) -> anyhow::Result<()> { - apply_neon::apply_in_neon(record, record_lsn, key, page)?; - - Ok(()) - } } #[cfg(test)]