From 79bd6f8d444789de26ea3ecdb0daf6646b6787d9 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 18 Oct 2024 18:43:27 +0200 Subject: [PATCH] pageserver: refactor multixact records --- pageserver/src/walingest.rs | 133 ++++++++++++++++++++++++------------ 1 file changed, 90 insertions(+), 43 deletions(-) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index e94fba38c6..267a9e9553 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -189,6 +189,18 @@ struct XactPrepare { data: Bytes, } +enum MultiXactRecord { + ZeroPage(MultiXactZeroPage), + Create(XlMultiXactCreate), + Truncate(XlMultiXactTruncate), +} + +struct MultiXactZeroPage { + slru_kind: SlruKind, + segno: u32, + rpageno: u32, +} + impl WalIngest { pub async fn new( timeline: &Timeline, @@ -342,49 +354,22 @@ impl WalIngest { } } pg_constants::RM_MULTIXACT_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { - let pageno = if pg_version < 17 { - buf.get_u32_le() - } else { - buf.get_u64_le() as u32 - }; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::MultiXactOffsets, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { - let pageno = if pg_version < 17 { - buf.get_u32_le() - } else { - buf.get_u64_le() as u32 - }; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::MultiXactMembers, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - self.ingest_multixact_create_record(modification, &xlrec)?; - } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { - let xlrec = XlMultiXactTruncate::decode(&mut buf); - self.ingest_multixact_truncate_record(modification, &xlrec, ctx) - .await?; + let maybe_multixact_record = + Self::decode_multixact_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(multixact_record) = maybe_multixact_record { + match multixact_record { + MultiXactRecord::ZeroPage(zero_page) => { + self.ingest_multixact_zero_page_record(zero_page, modification, ctx) + .await?; + } + MultiXactRecord::Create(create) => { + self.ingest_multixact_create_record(modification, &create)?; + } + MultiXactRecord::Truncate(truncate) => { + self.ingest_multixact_truncate_record(modification, &truncate, ctx) + .await?; + } + } } } pg_constants::RM_RELMAP_ID => { @@ -2126,6 +2111,68 @@ impl WalIngest { Ok(()) } + async fn ingest_multixact_zero_page_record( + &mut self, + zero_page: MultiXactZeroPage, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> Result<()> { + let MultiXactZeroPage { + slru_kind, + segno, + rpageno, + } = zero_page; + self.put_slru_page_image( + modification, + slru_kind, + segno, + rpageno, + ZERO_PAGE.clone(), + ctx, + ) + .await + } + + fn decode_multixact_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE + || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE + { + let pageno = if pg_version < 17 { + buf.get_u32_le() + } else { + buf.get_u64_le() as u32 + }; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + let slru_kind = match info { + pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets, + pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers, + _ => unreachable!(), + }; + + return Ok(Some(MultiXactRecord::ZeroPage(MultiXactZeroPage { + slru_kind, + segno, + rpageno, + }))); + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(buf); + return Ok(Some(MultiXactRecord::Create(xlrec))); + } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { + let xlrec = XlMultiXactTruncate::decode(buf); + return Ok(Some(MultiXactRecord::Truncate(xlrec))); + } + + Ok(None) + } + async fn ingest_relmap_page( &mut self, modification: &mut DatadirModification<'_>,