pageserver: refactor multixact records

This commit is contained in:
Vlad Lazar
2024-10-18 18:43:27 +02:00
parent 813bd0213d
commit 79bd6f8d44

View File

@@ -189,6 +189,18 @@ struct XactPrepare {
data: Bytes,
}
enum MultiXactRecord {
ZeroPage(MultiXactZeroPage),
Create(XlMultiXactCreate),
Truncate(XlMultiXactTruncate),
}
struct MultiXactZeroPage {
slru_kind: SlruKind,
segno: u32,
rpageno: u32,
}
impl WalIngest {
pub async fn new(
timeline: &Timeline,
@@ -342,49 +354,22 @@ impl WalIngest {
}
}
pg_constants::RM_MULTIXACT_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
let pageno = if pg_version < 17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32
};
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
SlruKind::MultiXactOffsets,
segno,
rpageno,
ZERO_PAGE.clone(),
ctx,
)
.await?;
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
let pageno = if pg_version < 17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32
};
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
SlruKind::MultiXactMembers,
segno,
rpageno,
ZERO_PAGE.clone(),
ctx,
)
.await?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
self.ingest_multixact_create_record(modification, &xlrec)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
.await?;
let maybe_multixact_record =
Self::decode_multixact_record(&mut buf, &decoded, pg_version).unwrap();
if let Some(multixact_record) = maybe_multixact_record {
match multixact_record {
MultiXactRecord::ZeroPage(zero_page) => {
self.ingest_multixact_zero_page_record(zero_page, modification, ctx)
.await?;
}
MultiXactRecord::Create(create) => {
self.ingest_multixact_create_record(modification, &create)?;
}
MultiXactRecord::Truncate(truncate) => {
self.ingest_multixact_truncate_record(modification, &truncate, ctx)
.await?;
}
}
}
}
pg_constants::RM_RELMAP_ID => {
@@ -2126,6 +2111,68 @@ impl WalIngest {
Ok(())
}
async fn ingest_multixact_zero_page_record(
&mut self,
zero_page: MultiXactZeroPage,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
let MultiXactZeroPage {
slru_kind,
segno,
rpageno,
} = zero_page;
self.put_slru_page_image(
modification,
slru_kind,
segno,
rpageno,
ZERO_PAGE.clone(),
ctx,
)
.await
}
fn decode_multixact_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
) -> anyhow::Result<Option<MultiXactRecord>> {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
{
let pageno = if pg_version < 17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32
};
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
let slru_kind = match info {
pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets,
pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers,
_ => unreachable!(),
};
return Ok(Some(MultiXactRecord::ZeroPage(MultiXactZeroPage {
slru_kind,
segno,
rpageno,
})));
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(buf);
return Ok(Some(MultiXactRecord::Create(xlrec)));
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(buf);
return Ok(Some(MultiXactRecord::Truncate(xlrec)));
}
Ok(None)
}
async fn ingest_relmap_page(
&mut self,
modification: &mut DatadirModification<'_>,