From ceadcc4f35e4faee666dcda84377a9cede9e8e51 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 16 Oct 2024 17:52:00 +0200 Subject: [PATCH] pageserver: refactor smgr records --- pageserver/src/walingest.rs | 158 ++++++++++++++++++++++++------------ 1 file changed, 108 insertions(+), 50 deletions(-) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index dd1f18440f..459e4d249d 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -121,6 +121,20 @@ struct NeonrmgrRecord { flags: u8, } +enum SmgrRecord { + Create(SmgrCreate), + Truncate(SmgrTruncate), +} + +struct SmgrCreate { + rel: RelTag, +} + +struct SmgrTruncate { + rel: RelTag, + to: BlockNumber, +} + impl WalIngest { pub async fn new( timeline: &Timeline, @@ -213,16 +227,19 @@ impl WalIngest { } // Handle other special record types pg_constants::RM_SMGR_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_SMGR_CREATE { - let create = XlSmgrCreate::decode(&mut buf); - self.ingest_xlog_smgr_create(modification, &create, ctx) - .await?; - } else if info == pg_constants::XLOG_SMGR_TRUNCATE { - let truncate = XlSmgrTruncate::decode(&mut buf); - self.ingest_xlog_smgr_truncate(modification, &truncate, ctx) - .await?; + let maybe_smgr_record = + Self::decode_smgr_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(smgr_record) = maybe_smgr_record { + match smgr_record { + SmgrRecord::Create(create) => { + self.ingest_xlog_smgr_create(create, modification, ctx) + .await?; + } + SmgrRecord::Truncate(truncate) => { + self.ingest_xlog_smgr_truncate(truncate, modification, ctx) + .await?; + } + } } } pg_constants::RM_DBASE_ID => { @@ -1412,54 +1429,102 @@ impl WalIngest { async fn ingest_xlog_smgr_create( &mut self, + create: SmgrCreate, modification: &mut DatadirModification<'_>, - rec: &XlSmgrCreate, ctx: &RequestContext, ) -> anyhow::Result<()> { - let rel = RelTag { - spcnode: rec.rnode.spcnode, - dbnode: rec.rnode.dbnode, - relnode: rec.rnode.relnode, - forknum: rec.forknum, - }; + let SmgrCreate { rel } = create; self.put_rel_creation(modification, rel, ctx).await?; Ok(()) } + fn decode_smgr_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_SMGR_CREATE { + let create = XlSmgrCreate::decode(buf); + let rel = RelTag { + spcnode: create.rnode.spcnode, + dbnode: create.rnode.dbnode, + relnode: create.rnode.relnode, + forknum: create.forknum, + }; + + return Ok(Some(SmgrRecord::Create(SmgrCreate { rel }))); + } else if info == pg_constants::XLOG_SMGR_TRUNCATE { + let truncate = XlSmgrTruncate::decode(buf); + + let spcnode = truncate.rnode.spcnode; + let dbnode = truncate.rnode.dbnode; + let relnode = truncate.rnode.relnode; + + if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 { + let rel = RelTag { + spcnode, + dbnode, + relnode, + forknum: MAIN_FORKNUM, + }; + + return Ok(Some(SmgrRecord::Truncate(SmgrTruncate { + rel, + to: truncate.blkno, + }))); + } + + if (truncate.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 { + let rel = RelTag { + spcnode, + dbnode, + relnode, + forknum: FSM_FORKNUM, + }; + + return Ok(Some(SmgrRecord::Truncate(SmgrTruncate { + rel, + to: truncate.blkno, + }))); + } + + if (truncate.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 { + let rel = RelTag { + spcnode, + dbnode, + relnode, + forknum: VISIBILITYMAP_FORKNUM, + }; + + return Ok(Some(SmgrRecord::Truncate(SmgrTruncate { + rel, + to: truncate.blkno, + }))); + } + } + + Ok(None) + } + /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record. /// /// This is the same logic as in PostgreSQL's smgr_redo() function. async fn ingest_xlog_smgr_truncate( &mut self, + truncate: SmgrTruncate, modification: &mut DatadirModification<'_>, - rec: &XlSmgrTruncate, ctx: &RequestContext, ) -> anyhow::Result<()> { - let spcnode = rec.rnode.spcnode; - let dbnode = rec.rnode.dbnode; - let relnode = rec.rnode.relnode; + let SmgrTruncate { rel, to } = truncate; - if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 { - let rel = RelTag { - spcnode, - dbnode, - relnode, - forknum: MAIN_FORKNUM, - }; - self.put_rel_truncation(modification, rel, rec.blkno, ctx) - .await?; + if rel.forknum == MAIN_FORKNUM { + self.put_rel_truncation(modification, rel, to, ctx).await?; } - if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 { - let rel = RelTag { - spcnode, - dbnode, - relnode, - forknum: FSM_FORKNUM, - }; - - let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE; + if rel.forknum == FSM_FORKNUM { + let fsm_logical_page_no = to / pg_constants::SLOTS_PER_FSM_PAGE; let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no); - if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 { + if to % pg_constants::SLOTS_PER_FSM_PAGE != 0 { // Tail of last remaining FSM page has to be zeroed. // We are not precise here and instead of digging in FSM bitmap format just clear the whole page. modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?; @@ -1472,16 +1537,9 @@ impl WalIngest { .await?; } } - if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 { - let rel = RelTag { - spcnode, - dbnode, - relnode, - forknum: VISIBILITYMAP_FORKNUM, - }; - - let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE; - if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 { + if rel.forknum == VISIBILITYMAP_FORKNUM { + let mut vm_page_no = to / pg_constants::VM_HEAPBLOCKS_PER_PAGE; + if to % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 { // Tail of last remaining vm page has to be zeroed. // We are not precise here and instead of digging in VM bitmap format just clear the whole page. modification.put_rel_page_image_zero(rel, vm_page_no)?;