From 345ae45faaa64c17375943997cdca45c1cdc400b Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Sat, 19 Oct 2024 14:36:58 +0200 Subject: [PATCH] pageserver: refactor replorigin record --- pageserver/src/walingest.rs | 55 +++++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 1e8a1ee97e..038176f787 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -225,6 +225,11 @@ struct StandbyRunningXacts { oldest_running_xid: u32, } +enum ReploriginRecord { + Set(XlReploriginSet), + Drop(XlReploriginDrop), +} + impl WalIngest { pub async fn new( timeline: &Timeline, @@ -433,15 +438,11 @@ impl WalIngest { } } pg_constants::RM_REPLORIGIN_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_REPLORIGIN_SET { - let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf); - modification - .set_replorigin(xlrec.node_id, xlrec.remote_lsn) - .await? - } else if info == pg_constants::XLOG_REPLORIGIN_DROP { - let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf); - modification.drop_replorigin(xlrec.node_id).await? + let maybe_replorigin_record = + Self::decode_replorigin_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(replorigin_record) = maybe_replorigin_record { + self.ingest_replorigin_record(replorigin_record, modification) + .await?; } } _x => { @@ -2325,6 +2326,42 @@ impl WalIngest { Ok(()) } + fn decode_replorigin_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_REPLORIGIN_SET { + let xlrec = crate::walrecord::XlReploriginSet::decode(buf); + return Ok(Some(ReploriginRecord::Set(xlrec))); + } else if info == pg_constants::XLOG_REPLORIGIN_DROP { + let xlrec = crate::walrecord::XlReploriginDrop::decode(buf); + return Ok(Some(ReploriginRecord::Drop(xlrec))); + } + + Ok(None) + } + + async fn ingest_replorigin_record( + &mut self, + record: ReploriginRecord, + modification: &mut DatadirModification<'_>, + ) -> Result<()> { + match record { + ReploriginRecord::Set(set) => { + modification + .set_replorigin(set.node_id, set.remote_lsn) + .await?; + } + ReploriginRecord::Drop(drop) => { + modification.drop_replorigin(drop.node_id).await?; + } + } + + Ok(()) + } + async fn put_rel_creation( &mut self, modification: &mut DatadirModification<'_>,