pageserver: refactor smgr records

This commit is contained in:
Vlad Lazar
2024-10-16 17:52:00 +02:00
parent ee2a962028
commit ceadcc4f35

View File

@@ -121,6 +121,20 @@ struct NeonrmgrRecord {
flags: u8,
}
enum SmgrRecord {
Create(SmgrCreate),
Truncate(SmgrTruncate),
}
struct SmgrCreate {
rel: RelTag,
}
struct SmgrTruncate {
rel: RelTag,
to: BlockNumber,
}
impl WalIngest {
pub async fn new(
timeline: &Timeline,
@@ -213,16 +227,19 @@ impl WalIngest {
}
// Handle other special record types
pg_constants::RM_SMGR_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_SMGR_CREATE {
let create = XlSmgrCreate::decode(&mut buf);
self.ingest_xlog_smgr_create(modification, &create, ctx)
.await?;
} else if info == pg_constants::XLOG_SMGR_TRUNCATE {
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
.await?;
let maybe_smgr_record =
Self::decode_smgr_record(&mut buf, &decoded, pg_version).unwrap();
if let Some(smgr_record) = maybe_smgr_record {
match smgr_record {
SmgrRecord::Create(create) => {
self.ingest_xlog_smgr_create(create, modification, ctx)
.await?;
}
SmgrRecord::Truncate(truncate) => {
self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
.await?;
}
}
}
}
pg_constants::RM_DBASE_ID => {
@@ -1412,54 +1429,102 @@ impl WalIngest {
async fn ingest_xlog_smgr_create(
&mut self,
create: SmgrCreate,
modification: &mut DatadirModification<'_>,
rec: &XlSmgrCreate,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let rel = RelTag {
spcnode: rec.rnode.spcnode,
dbnode: rec.rnode.dbnode,
relnode: rec.rnode.relnode,
forknum: rec.forknum,
};
let SmgrCreate { rel } = create;
self.put_rel_creation(modification, rel, ctx).await?;
Ok(())
}
fn decode_smgr_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
_pg_version: u32,
) -> anyhow::Result<Option<SmgrRecord>> {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_SMGR_CREATE {
let create = XlSmgrCreate::decode(buf);
let rel = RelTag {
spcnode: create.rnode.spcnode,
dbnode: create.rnode.dbnode,
relnode: create.rnode.relnode,
forknum: create.forknum,
};
return Ok(Some(SmgrRecord::Create(SmgrCreate { rel })));
} else if info == pg_constants::XLOG_SMGR_TRUNCATE {
let truncate = XlSmgrTruncate::decode(buf);
let spcnode = truncate.rnode.spcnode;
let dbnode = truncate.rnode.dbnode;
let relnode = truncate.rnode.relnode;
if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
let rel = RelTag {
spcnode,
dbnode,
relnode,
forknum: MAIN_FORKNUM,
};
return Ok(Some(SmgrRecord::Truncate(SmgrTruncate {
rel,
to: truncate.blkno,
})));
}
if (truncate.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
let rel = RelTag {
spcnode,
dbnode,
relnode,
forknum: FSM_FORKNUM,
};
return Ok(Some(SmgrRecord::Truncate(SmgrTruncate {
rel,
to: truncate.blkno,
})));
}
if (truncate.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
let rel = RelTag {
spcnode,
dbnode,
relnode,
forknum: VISIBILITYMAP_FORKNUM,
};
return Ok(Some(SmgrRecord::Truncate(SmgrTruncate {
rel,
to: truncate.blkno,
})));
}
}
Ok(None)
}
/// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
///
/// This is the same logic as in PostgreSQL's smgr_redo() function.
async fn ingest_xlog_smgr_truncate(
&mut self,
truncate: SmgrTruncate,
modification: &mut DatadirModification<'_>,
rec: &XlSmgrTruncate,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let spcnode = rec.rnode.spcnode;
let dbnode = rec.rnode.dbnode;
let relnode = rec.rnode.relnode;
let SmgrTruncate { rel, to } = truncate;
if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
let rel = RelTag {
spcnode,
dbnode,
relnode,
forknum: MAIN_FORKNUM,
};
self.put_rel_truncation(modification, rel, rec.blkno, ctx)
.await?;
if rel.forknum == MAIN_FORKNUM {
self.put_rel_truncation(modification, rel, to, ctx).await?;
}
if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
let rel = RelTag {
spcnode,
dbnode,
relnode,
forknum: FSM_FORKNUM,
};
let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
if rel.forknum == FSM_FORKNUM {
let fsm_logical_page_no = to / pg_constants::SLOTS_PER_FSM_PAGE;
let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
if to % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
// Tail of last remaining FSM page has to be zeroed.
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
@@ -1472,16 +1537,9 @@ impl WalIngest {
.await?;
}
}
if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
let rel = RelTag {
spcnode,
dbnode,
relnode,
forknum: VISIBILITYMAP_FORKNUM,
};
let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
if rel.forknum == VISIBILITYMAP_FORKNUM {
let mut vm_page_no = to / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
if to % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
// Tail of last remaining vm page has to be zeroed.
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, vm_page_no)?;