run correct pg_version walredo against the walrecords

This commit is contained in:
Joonas Koivunen
2024-09-12 13:18:48 +00:00
parent 08705d1b8c
commit 7b6a888c24

View File

@@ -5096,33 +5096,130 @@ impl Timeline {
//
// If we don't have a base image, then the oldest WAL record better initialize
// the page
if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
Err(PageReconstructError::from(anyhow!(
"Base image for {} at {} not found, but got {} WAL records",
key,
request_lsn,
data.records.len()
)))
} else {
if data.img.is_some() {
let have_img = data.img.is_some();
let will_init = data
.records
.first()
.map(|(_, rec)| rec.will_init())
.expect("already checked to have records");
match (have_img, will_init) {
(false, false) => {
return Err(PageReconstructError::from(anyhow!(
"Base image for {} at {} not found, but got {} WAL records",
key,
request_lsn,
data.records.len()
)))
}
(true, _) => {
trace!(
"found {} WAL records and a base image for {} at {}, performing WAL redo",
data.records.len(),
key,
request_lsn
);
} else {
}
(false, _) => {
assert!(will_init, "already checked above");
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
};
let res = self
.walredo_mgr
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
}
}
let oldest_lsn = data
.records
.first()
.map(|(lsn, _)| lsn)
.expect("again, checked");
// walk up the ancestry until we have found an ancestor covering the lsn range
let ancestry = std::iter::successors(Some(self), |tl| tl.ancestor_timeline.as_deref())
// 100 - initdb R pg14
// 150 - branch S pg14
// 200 - branch T pg15
// 250 - branch U pg15
// 300 - branch V pg16
//
// oldest_lsn = 155:
// get [V pg16, U pg15(one_more=true), T pg15(one_more=true), S pg14(one_more=false)]
.take_while({
let mut one_more = true;
move |tl| {
if *oldest_lsn < tl.ancestor_lsn {
assert!(one_more);
true
} else {
let prev = one_more;
one_more = false;
prev
}
}
})
// remove consecutive same pg_versions, which might be all in case we can use the
// same timeline for all reconstruction.
// [V pg16, U pg15, T pg15, S pg14] => [V pg16, T pg15, S pg14]
.fold(Vec::<&Timeline>::with_capacity(4), |mut acc, next| {
if acc
.last()
.map(|tl| tl.pg_version == next.pg_version)
.unwrap_or(false)
{
// overwrite with an earlier timeline; additionally we only allow upgrades,
// so we cannot go backwards like pg14 (branch) pg15 (branch) pg14
*acc.last_mut().unwrap() = next;
} else {
acc.push(next);
}
acc
});
// shifted for the purpose of timeline_pairs
let later_timelines = ancestry
.iter()
.rev()
.skip(1)
.map(Some)
.chain(std::iter::once(None));
// zip older and later timelines into pair, which we then use to select parts of
// wal records to be executed on which version walredo
let timeline_pairs = ancestry.iter().rev().zip(later_timelines);
let mgr = self
.walredo_mgr
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?;
let mut img = data.img.clone();
let mut records = &data.records[..];
for (older, later) in timeline_pairs {
let scratch = records
.iter()
.take_while(|(lsn, _)| {
// if there is no later, take all remaining
later.map(|later| lsn < &later.ancestor_lsn).unwrap_or(true)
})
.cloned()
.collect::<Vec<_>>();
records = &records[scratch.len()..];
if later.is_none() {
assert!(records.is_empty());
}
// this is only used for logging on the next round
let last_lsn = scratch.last().unwrap().0;
// is request_lsn ok? it's not used for anything important, just logging.
let res = mgr
.request_redo(key, request_lsn, img, scratch, older.pg_version)
.await;
let img = match res {
Ok(img) => img,
img = match res {
Ok(img) => Some((last_lsn, img)),
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
Err(walredo::Error::Other(e)) => {
return Err(PageReconstructError::WalRedo(
@@ -5130,8 +5227,9 @@ impl Timeline {
))
}
};
Ok(img)
}
Ok(img.unwrap().1)
}
}