turns on ingest_neonrmgr_record is just copy-pasta, re-do copy-pasta

This commit is contained in:
Christian Schwarz
2024-01-04 12:52:34 +00:00
parent 31fc069482
commit 92280727df

View File

@@ -875,62 +875,70 @@ impl WalIngest {
}
// Clear the VM bits if required.
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
let vm_rel = RelTag {
forknum: VISIBILITYMAP_FORKNUM,
spcnode: decoded.blocks[0].rnode_spcnode,
dbnode: decoded.blocks[0].rnode_dbnode,
relnode: decoded.blocks[0].rnode_relnode,
let vm_rel = RelTag {
forknum: VISIBILITYMAP_FORKNUM,
spcnode: decoded.blocks[0].rnode_spcnode,
dbnode: decoded.blocks[0].rnode_dbnode,
relnode: decoded.blocks[0].rnode_relnode,
};
let new = new_heap_blkno.map(|x| (x, pg_constants::HEAPBLK_TO_MAPBLOCK(x)));
let old = old_heap_blkno.map(|x| (x, pg_constants::HEAPBLK_TO_MAPBLOCK(x)));
// Sometimes, Postgres seems to create heap WAL records with the
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
// not set. In fact, it's possible that the VM page does not exist at all.
// In that case, we don't want to store a record to clear the VM bit;
// replaying it would fail to find the previous image of the page, because
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
// record if it doesn't.
let (new, old) = {
let vm_size = if new.or(old).is_some() {
Some(get_relsize(modification, vm_rel, ctx).await?)
} else {
None
};
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
// Sometimes, Postgres seems to create heap WAL records with the
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
// not set. In fact, it's possible that the VM page does not exist at all.
// In that case, we don't want to store a record to clear the VM bit;
// replaying it would fail to find the previous image of the page, because
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
// record if it doesn't.
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
if let Some(blknum) = new_vm_blk {
if blknum >= vm_size {
new_vm_blk = None;
}
}
if let Some(blknum) = old_vm_blk {
if blknum >= vm_size {
old_vm_blk = None;
}
}
if new_vm_blk.is_some() || old_vm_blk.is_some() {
if new_vm_blk == old_vm_blk {
// An UPDATE record that needs to clear the bits for both old and the
// new page, both of which reside on the same VM page.
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk.unwrap(),
NeonWalRecord::ClearVisibilityMapFlags {
heap_blkno_1: new_heap_blkno,
heap_blkno_2: old_heap_blkno,
flags,
},
ctx,
)
.await?;
let filter = |(heap_blk, vm_blk)| {
let vm_size = vm_size.expect("we set it to Some() if new or old is Some()");
if vm_blk >= vm_size {
None
} else {
// Clear VM bits for one heap page, or for two pages that reside on
// different VM pages.
if let Some(new_vm_blk) = new_vm_blk {
Some((heap_blk, vm_blk))
}
};
(new.and_then(filter), old.and_then(filter))
};
match (new, old) {
(Some((new_heap_blkno, new_vm_blk)), Some((old_heap_blkno, old_vm_blk)))
if new_vm_blk == old_vm_blk =>
{
// An UPDATE record that needs to clear the bits for both old and the
// new page, both of which reside on the same VM page.
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk, // could also be old_vm_blk, they're the same
NeonWalRecord::ClearVisibilityMapFlags {
heap_blkno_1: Some(new_heap_blkno),
heap_blkno_2: Some(old_heap_blkno),
flags,
},
ctx,
)
.await?;
}
(new, old) => {
// Emit one record per VM block that needs updating.
for update in [new, old] {
if let Some((heap_blkno, vm_blk)) = update {
self.put_rel_wal_record(
modification,
vm_rel,
new_vm_blk,
vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
heap_blkno_1: new_heap_blkno,
heap_blkno_1: Some(heap_blkno),
heap_blkno_2: None,
flags,
},
@@ -938,20 +946,6 @@ impl WalIngest {
)
.await?;
}
if let Some(old_vm_blk) = old_vm_blk {
self.put_rel_wal_record(
modification,
vm_rel,
old_vm_blk,
NeonWalRecord::ClearVisibilityMapFlags {
heap_blkno_1: None,
heap_blkno_2: old_heap_blkno,
flags,
},
ctx,
)
.await?;
}
}
}
}