mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 04:20:39 +00:00
pagesver: refactor heapam records
This commit is contained in:
@@ -107,6 +107,13 @@ struct WarnIngestLag {
|
||||
timestamp_invalid_msg_ratelimit: RateLimit,
|
||||
}
|
||||
|
||||
struct HeapamRecord {
|
||||
new_heap_blkno: Option<u32>,
|
||||
old_heap_blkno: Option<u32>,
|
||||
vm_rel: RelTag,
|
||||
flags: u8,
|
||||
}
|
||||
|
||||
impl WalIngest {
|
||||
pub async fn new(
|
||||
timeline: &Timeline,
|
||||
@@ -182,8 +189,12 @@ impl WalIngest {
|
||||
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
||||
// Heap AM records need some special handling, because they modify VM pages
|
||||
// without registering them with the standard mechanism.
|
||||
self.ingest_heapam_record(&mut buf, modification, &decoded, ctx)
|
||||
.await?;
|
||||
let maybe_heapam_record =
|
||||
Self::decode_heapam_record(&mut buf, &decoded, pg_version)?;
|
||||
if let Some(heapam_record) = maybe_heapam_record {
|
||||
self.ingest_heapam_record(heapam_record, modification, ctx)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
pg_constants::RM_NEON_ID => {
|
||||
self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx)
|
||||
@@ -711,11 +722,97 @@ impl WalIngest {
|
||||
|
||||
async fn ingest_heapam_record(
|
||||
&mut self,
|
||||
buf: &mut Bytes,
|
||||
heapam_record: HeapamRecord,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
decoded: &DecodedWALRecord,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let HeapamRecord {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
vm_rel,
|
||||
} = heapam_record;
|
||||
// Clear the VM bits if required.
|
||||
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
|
||||
// Sometimes, Postgres seems to create heap WAL records with the
|
||||
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
|
||||
// not set. In fact, it's possible that the VM page does not exist at all.
|
||||
// In that case, we don't want to store a record to clear the VM bit;
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
new_vm_blk = None;
|
||||
}
|
||||
}
|
||||
if let Some(blknum) = old_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
old_vm_blk = None;
|
||||
}
|
||||
}
|
||||
|
||||
if new_vm_blk.is_some() || old_vm_blk.is_some() {
|
||||
if new_vm_blk == old_vm_blk {
|
||||
// An UPDATE record that needs to clear the bits for both old and the
|
||||
// new page, both of which reside on the same VM page.
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk.unwrap(),
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
// Clear VM bits for one heap page, or for two pages that reside on
|
||||
// different VM pages.
|
||||
if let Some(new_vm_blk) = new_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno: None,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if let Some(old_vm_blk) = old_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
old_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno: None,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn decode_heapam_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<HeapamRecord>> {
|
||||
// Handle VM bit updates that are implicitly part of heap records.
|
||||
|
||||
// First, look at the record to determine which VM bits need
|
||||
@@ -725,7 +822,7 @@ impl WalIngest {
|
||||
let mut old_heap_blkno: Option<u32> = None;
|
||||
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
||||
|
||||
match modification.tline.pg_version {
|
||||
match pg_version {
|
||||
14 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
@@ -997,7 +1094,6 @@ impl WalIngest {
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Clear the VM bits if required.
|
||||
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
||||
let vm_rel = RelTag {
|
||||
forknum: VISIBILITYMAP_FORKNUM,
|
||||
@@ -1006,82 +1102,18 @@ impl WalIngest {
|
||||
relnode: decoded.blocks[0].rnode_relnode,
|
||||
};
|
||||
|
||||
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
|
||||
// Sometimes, Postgres seems to create heap WAL records with the
|
||||
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
|
||||
// not set. In fact, it's possible that the VM page does not exist at all.
|
||||
// In that case, we don't want to store a record to clear the VM bit;
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
new_vm_blk = None;
|
||||
}
|
||||
}
|
||||
if let Some(blknum) = old_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
old_vm_blk = None;
|
||||
}
|
||||
}
|
||||
|
||||
if new_vm_blk.is_some() || old_vm_blk.is_some() {
|
||||
if new_vm_blk == old_vm_blk {
|
||||
// An UPDATE record that needs to clear the bits for both old and the
|
||||
// new page, both of which reside on the same VM page.
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk.unwrap(),
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
// Clear VM bits for one heap page, or for two pages that reside on
|
||||
// different VM pages.
|
||||
if let Some(new_vm_blk) = new_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
new_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno: None,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if let Some(old_vm_blk) = old_vm_blk {
|
||||
self.put_rel_wal_record(
|
||||
modification,
|
||||
vm_rel,
|
||||
old_vm_blk,
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno: None,
|
||||
old_heap_blkno,
|
||||
flags,
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Some(HeapamRecord {
|
||||
new_heap_blkno,
|
||||
old_heap_blkno,
|
||||
vm_rel,
|
||||
flags,
|
||||
}))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
async fn ingest_neonrmgr_record(
|
||||
&mut self,
|
||||
buf: &mut Bytes,
|
||||
|
||||
Reference in New Issue
Block a user