From 59d8016c5aec496030592d215364224f65ff56fd Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Sat, 19 Oct 2024 14:29:22 +0200 Subject: [PATCH] pageserver: refactor standby record --- pageserver/src/walingest.rs | 53 ++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 4121a8ad2f..ccd655a0d1 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -216,6 +216,15 @@ struct LogicalMessageRecord { buf: Bytes, prefix_size: usize, } + +enum StandbyRecord { + RunningXacts(StandbyRunningXacts), +} + +struct StandbyRunningXacts { + oldest_running_xid: u32, +} + impl WalIngest { pub async fn new( timeline: &Timeline, @@ -417,15 +426,10 @@ impl WalIngest { } } pg_constants::RM_STANDBY_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_RUNNING_XACTS { - let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf); - - enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { - cp.oldestActiveXid = xlrec.oldest_running_xid; - }); - - self.checkpoint_modified = true; + let maybe_standby_record = + Self::decode_standby_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(standby_record) = maybe_standby_record { + self.ingest_standby_record(standby_record).unwrap(); } } pg_constants::RM_REPLORIGIN_ID => { @@ -2290,6 +2294,37 @@ impl WalIngest { Ok(None) } + + fn decode_standby_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_RUNNING_XACTS { + let xlrec = crate::walrecord::XlRunningXacts::decode(buf); + return Ok(Some(StandbyRecord::RunningXacts(StandbyRunningXacts { + oldest_running_xid: xlrec.oldest_running_xid, + }))); + } + + Ok(None) + } + + fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> { + match record { + StandbyRecord::RunningXacts(running_xacts) => { + enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { + cp.oldestActiveXid = running_xacts.oldest_running_xid; + }); + + self.checkpoint_modified = true; + } + } + + Ok(()) + } + async fn put_rel_creation( &mut self, modification: &mut DatadirModification<'_>,