pageserver: refactor standby record

This commit is contained in:
Vlad Lazar
2024-10-19 14:29:22 +02:00
parent 1836cb23ff
commit 59d8016c5a

View File

@@ -216,6 +216,15 @@ struct LogicalMessageRecord {
buf: Bytes,
prefix_size: usize,
}
enum StandbyRecord {
RunningXacts(StandbyRunningXacts),
}
struct StandbyRunningXacts {
oldest_running_xid: u32,
}
impl WalIngest {
pub async fn new(
timeline: &Timeline,
@@ -417,15 +426,10 @@ impl WalIngest {
}
}
pg_constants::RM_STANDBY_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestActiveXid = xlrec.oldest_running_xid;
});
self.checkpoint_modified = true;
let maybe_standby_record =
Self::decode_standby_record(&mut buf, &decoded, pg_version).unwrap();
if let Some(standby_record) = maybe_standby_record {
self.ingest_standby_record(standby_record).unwrap();
}
}
pg_constants::RM_REPLORIGIN_ID => {
@@ -2290,6 +2294,37 @@ impl WalIngest {
Ok(None)
}
fn decode_standby_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
_pg_version: u32,
) -> anyhow::Result<Option<StandbyRecord>> {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(buf);
return Ok(Some(StandbyRecord::RunningXacts(StandbyRunningXacts {
oldest_running_xid: xlrec.oldest_running_xid,
})));
}
Ok(None)
}
fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> {
match record {
StandbyRecord::RunningXacts(running_xacts) => {
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestActiveXid = running_xacts.oldest_running_xid;
});
self.checkpoint_modified = true;
}
}
Ok(())
}
async fn put_rel_creation(
&mut self,
modification: &mut DatadirModification<'_>,