Handle update of VM in XLOG_HEAP_LOCK/XLOG_HEAP2_LOCK_UPDATED WAL records (#4896)

## Problem

VM should be updated if XLH_LOCK_ALL_FROZEN_CLEARED flags is set in
XLOG_HEAP_LOCK,XLOG_HEAP_2_LOCK_UPDATED WAL records

## Summary of changes

Add handling of this records in walingest.rs

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2023-09-15 17:47:29 +03:00
committed by GitHub
parent 9e6b5b686c
commit 66fa176cc8
4 changed files with 244 additions and 13 deletions

View File

@@ -137,9 +137,12 @@ pub const XLOG_HEAP_INSERT: u8 = 0x00;
pub const XLOG_HEAP_DELETE: u8 = 0x10;
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
pub const XLOG_HEAP_LOCK: u8 = 0x60;
pub const XLOG_HEAP_INIT_PAGE: u8 = 0x80;
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
pub const XLOG_HEAP2_LOCK_UPDATED: u8 = 0x60;
pub const XLH_LOCK_ALL_FROZEN_CLEARED: u8 = 0x01;
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;

View File

@@ -444,6 +444,7 @@ impl<'a> WalIngest<'a> {
// need to clear the corresponding bits in the visibility map.
let mut new_heap_blkno: Option<u32> = None;
let mut old_heap_blkno: Option<u32> = None;
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
match self.timeline.pg_version {
14 => {
@@ -479,6 +480,12 @@ impl<'a> WalIngest<'a> {
// set.
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_LOCK {
let xlrec = v14::XlHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -497,6 +504,12 @@ impl<'a> WalIngest<'a> {
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
let xlrec = v14::XlHeapLockUpdated::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -535,6 +548,12 @@ impl<'a> WalIngest<'a> {
// set.
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_LOCK {
let xlrec = v15::XlHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -553,6 +572,12 @@ impl<'a> WalIngest<'a> {
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
let xlrec = v15::XlHeapLockUpdated::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -591,6 +616,12 @@ impl<'a> WalIngest<'a> {
// set.
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_LOCK {
let xlrec = v16::XlHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -609,6 +640,12 @@ impl<'a> WalIngest<'a> {
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
let xlrec = v16::XlHeapLockUpdated::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -616,7 +653,6 @@ impl<'a> WalIngest<'a> {
}
_ => {}
}
// FIXME: What about XLOG_HEAP_LOCK and XLOG_HEAP2_LOCK_UPDATED?
// Clear the VM bits if required.
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
@@ -660,7 +696,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -676,7 +712,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -690,7 +726,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -717,6 +753,8 @@ impl<'a> WalIngest<'a> {
// need to clear the corresponding bits in the visibility map.
let mut new_heap_blkno: Option<u32> = None;
let mut old_heap_blkno: Option<u32> = None;
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
match self.timeline.pg_version {
@@ -772,7 +810,11 @@ impl<'a> WalIngest<'a> {
}
}
pg_constants::XLOG_NEON_HEAP_LOCK => {
/* XLOG_NEON_HEAP_LOCK doesn't need special care */
let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
}
@@ -783,8 +825,6 @@ impl<'a> WalIngest<'a> {
),
}
// FIXME: What about XLOG_NEON_HEAP_LOCK?
// Clear the VM bits if required.
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
let vm_rel = RelTag {
@@ -827,7 +867,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -843,7 +883,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -857,7 +897,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)

View File

@@ -219,20 +219,66 @@ pub mod v14 {
old_offnum: buf.get_u16_le(),
old_infobits_set: buf.get_u8(),
flags: buf.get_u8(),
t_cid: buf.get_u32(),
t_cid: buf.get_u32_le(),
new_xmax: buf.get_u32_le(),
new_offnum: buf.get_u16_le(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapLock {
pub locking_xid: TransactionId,
pub offnum: OffsetNumber,
pub _padding: u16,
pub t_cid: u32,
pub infobits_set: u8,
pub flags: u8,
}
impl XlHeapLock {
pub fn decode(buf: &mut Bytes) -> XlHeapLock {
XlHeapLock {
locking_xid: buf.get_u32_le(),
offnum: buf.get_u16_le(),
_padding: buf.get_u16_le(),
t_cid: buf.get_u32_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapLockUpdated {
pub xmax: TransactionId,
pub offnum: OffsetNumber,
pub infobits_set: u8,
pub flags: u8,
}
impl XlHeapLockUpdated {
pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
XlHeapLockUpdated {
xmax: buf.get_u32_le(),
offnum: buf.get_u16_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
}
pub mod v15 {
pub use super::v14::{XlHeapDelete, XlHeapInsert, XlHeapMultiInsert, XlHeapUpdate};
pub use super::v14::{
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
};
}
pub mod v16 {
pub use super::v14::{XlHeapInsert, XlHeapMultiInsert};
pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
use bytes::{Buf, Bytes};
use postgres_ffi::{OffsetNumber, TransactionId};
@@ -278,6 +324,26 @@ pub mod v16 {
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapLock {
pub locking_xid: TransactionId,
pub offnum: OffsetNumber,
pub infobits_set: u8,
pub flags: u8,
}
impl XlHeapLock {
pub fn decode(buf: &mut Bytes) -> XlHeapLock {
XlHeapLock {
locking_xid: buf.get_u32_le(),
offnum: buf.get_u16_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
/* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
pub mod rm_neon {
use bytes::{Buf, Bytes};
@@ -366,6 +432,28 @@ pub mod v16 {
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlNeonHeapLock {
pub locking_xid: TransactionId,
pub t_cid: u32,
pub offnum: OffsetNumber,
pub infobits_set: u8,
pub flags: u8,
}
impl XlNeonHeapLock {
pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
XlNeonHeapLock {
locking_xid: buf.get_u32_le(),
t_cid: buf.get_u32_le(),
offnum: buf.get_u16_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
}
}

View File

@@ -111,3 +111,103 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
assert cur_new.fetchall() == []
cur_new.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
assert cur_new.fetchall() == []
#
# Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK
# record.
#
def test_vm_bit_clear_on_heap_lock(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_vm_bit_clear_on_heap_lock", "empty")
endpoint = env.endpoints.create_start(
"test_vm_bit_clear_on_heap_lock",
config_lines=[
"log_autovacuum_min_duration = 0",
# Perform anti-wraparound vacuuming aggressively
"autovacuum_naptime='1 s'",
"autovacuum_freeze_max_age = 1000000",
],
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Install extension containing function needed for test
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute("SELECT pg_switch_wal()")
# Create a test table and freeze it to set the all-frozen VM bit on all pages.
cur.execute("CREATE TABLE vmtest_lock (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_lock SELECT g FROM generate_series(1, 50000) g")
cur.execute("VACUUM FREEZE vmtest_lock")
# Lock a row. This clears the all-frozen VM bit for that page.
cur.execute("SELECT * FROM vmtest_lock WHERE id = 40000 FOR UPDATE")
# Remember the XID. We will use it later to verify that we have consumed a lot of
# XIDs after this.
cur.execute("select pg_current_xact_id()")
locking_xid = cur.fetchall()[0][0]
# Stop and restart postgres, to clear the buffer cache.
#
# NOTE: clear_buffer_cache() will not do, because it evicts the dirty pages
# in a "clean" way. Our neon extension will write a full-page image of the VM
# page, and we want to avoid that.
endpoint.stop()
endpoint.start()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ")
tup = cur.fetchall()
xmax_before = tup[0][1]
# Consume a lot of XIDs, so that anti-wraparound autovacuum kicks
# in and the clog gets truncated. We set autovacuum_freeze_max_age to a very
# low value, so it doesn't take all that many XIDs for autovacuum to kick in.
for i in range(1000):
cur.execute(
"""
CREATE TEMP TABLE othertable (i int) ON COMMIT DROP;
do $$
begin
for i in 1..100000 loop
-- Use a begin-exception block to generate a new subtransaction on each iteration
begin
insert into othertable values (i);
exception when others then
raise 'not expected %', sqlerrm;
end;
end loop;
end;
$$;
"""
)
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ")
tup = cur.fetchall()
log.info(f"tuple = {tup}")
xmax = tup[0][1]
assert xmax == xmax_before
if i % 50 == 0:
cur.execute("select datfrozenxid from pg_database where datname='postgres'")
datfrozenxid = cur.fetchall()[0][0]
if datfrozenxid > locking_xid:
break
cur.execute("select pg_current_xact_id()")
curr_xid = cur.fetchall()[0][0]
assert int(curr_xid) - int(locking_xid) >= 100000
# Now, if the VM all-frozen bit was not correctly cleared on
# replay, we will try to fetch the status of the XID that was
# already truncated away.
#
# ERROR: could not access status of transaction 1027
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 for update")
tup = cur.fetchall()
log.info(f"tuple = {tup}")