mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
Handle update of VM in XLOG_HEAP_LOCK/XLOG_HEAP2_LOCK_UPDATED WAL records (#4896)
## Problem VM should be updated if XLH_LOCK_ALL_FROZEN_CLEARED flags is set in XLOG_HEAP_LOCK,XLOG_HEAP_2_LOCK_UPDATED WAL records ## Summary of changes Add handling of this records in walingest.rs ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --------- Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
This commit is contained in:
committed by
GitHub
parent
9e6b5b686c
commit
66fa176cc8
@@ -137,9 +137,12 @@ pub const XLOG_HEAP_INSERT: u8 = 0x00;
|
|||||||
pub const XLOG_HEAP_DELETE: u8 = 0x10;
|
pub const XLOG_HEAP_DELETE: u8 = 0x10;
|
||||||
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
|
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
|
||||||
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
|
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
|
||||||
|
pub const XLOG_HEAP_LOCK: u8 = 0x60;
|
||||||
pub const XLOG_HEAP_INIT_PAGE: u8 = 0x80;
|
pub const XLOG_HEAP_INIT_PAGE: u8 = 0x80;
|
||||||
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
|
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
|
||||||
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
|
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
|
||||||
|
pub const XLOG_HEAP2_LOCK_UPDATED: u8 = 0x60;
|
||||||
|
pub const XLH_LOCK_ALL_FROZEN_CLEARED: u8 = 0x01;
|
||||||
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
|
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
|
||||||
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
||||||
pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
||||||
|
|||||||
@@ -444,6 +444,7 @@ impl<'a> WalIngest<'a> {
|
|||||||
// need to clear the corresponding bits in the visibility map.
|
// need to clear the corresponding bits in the visibility map.
|
||||||
let mut new_heap_blkno: Option<u32> = None;
|
let mut new_heap_blkno: Option<u32> = None;
|
||||||
let mut old_heap_blkno: Option<u32> = None;
|
let mut old_heap_blkno: Option<u32> = None;
|
||||||
|
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
||||||
|
|
||||||
match self.timeline.pg_version {
|
match self.timeline.pg_version {
|
||||||
14 => {
|
14 => {
|
||||||
@@ -479,6 +480,12 @@ impl<'a> WalIngest<'a> {
|
|||||||
// set.
|
// set.
|
||||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
}
|
}
|
||||||
|
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||||
|
let xlrec = v14::XlHeapLock::decode(buf);
|
||||||
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||||
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||||
@@ -497,6 +504,12 @@ impl<'a> WalIngest<'a> {
|
|||||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
}
|
}
|
||||||
|
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||||
|
let xlrec = v14::XlHeapLockUpdated::decode(buf);
|
||||||
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||||
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||||
@@ -535,6 +548,12 @@ impl<'a> WalIngest<'a> {
|
|||||||
// set.
|
// set.
|
||||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
}
|
}
|
||||||
|
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||||
|
let xlrec = v15::XlHeapLock::decode(buf);
|
||||||
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||||
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||||
@@ -553,6 +572,12 @@ impl<'a> WalIngest<'a> {
|
|||||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
}
|
}
|
||||||
|
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||||
|
let xlrec = v15::XlHeapLockUpdated::decode(buf);
|
||||||
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||||
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||||
@@ -591,6 +616,12 @@ impl<'a> WalIngest<'a> {
|
|||||||
// set.
|
// set.
|
||||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
}
|
}
|
||||||
|
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
||||||
|
let xlrec = v16::XlHeapLock::decode(buf);
|
||||||
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||||
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||||
@@ -609,6 +640,12 @@ impl<'a> WalIngest<'a> {
|
|||||||
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
||||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
}
|
}
|
||||||
|
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
||||||
|
let xlrec = v16::XlHeapLockUpdated::decode(buf);
|
||||||
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||||
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||||
@@ -616,7 +653,6 @@ impl<'a> WalIngest<'a> {
|
|||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
// FIXME: What about XLOG_HEAP_LOCK and XLOG_HEAP2_LOCK_UPDATED?
|
|
||||||
|
|
||||||
// Clear the VM bits if required.
|
// Clear the VM bits if required.
|
||||||
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
||||||
@@ -660,7 +696,7 @@ impl<'a> WalIngest<'a> {
|
|||||||
NeonWalRecord::ClearVisibilityMapFlags {
|
NeonWalRecord::ClearVisibilityMapFlags {
|
||||||
new_heap_blkno,
|
new_heap_blkno,
|
||||||
old_heap_blkno,
|
old_heap_blkno,
|
||||||
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
|
flags,
|
||||||
},
|
},
|
||||||
ctx,
|
ctx,
|
||||||
)
|
)
|
||||||
@@ -676,7 +712,7 @@ impl<'a> WalIngest<'a> {
|
|||||||
NeonWalRecord::ClearVisibilityMapFlags {
|
NeonWalRecord::ClearVisibilityMapFlags {
|
||||||
new_heap_blkno,
|
new_heap_blkno,
|
||||||
old_heap_blkno: None,
|
old_heap_blkno: None,
|
||||||
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
|
flags,
|
||||||
},
|
},
|
||||||
ctx,
|
ctx,
|
||||||
)
|
)
|
||||||
@@ -690,7 +726,7 @@ impl<'a> WalIngest<'a> {
|
|||||||
NeonWalRecord::ClearVisibilityMapFlags {
|
NeonWalRecord::ClearVisibilityMapFlags {
|
||||||
new_heap_blkno: None,
|
new_heap_blkno: None,
|
||||||
old_heap_blkno,
|
old_heap_blkno,
|
||||||
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
|
flags,
|
||||||
},
|
},
|
||||||
ctx,
|
ctx,
|
||||||
)
|
)
|
||||||
@@ -717,6 +753,8 @@ impl<'a> WalIngest<'a> {
|
|||||||
// need to clear the corresponding bits in the visibility map.
|
// need to clear the corresponding bits in the visibility map.
|
||||||
let mut new_heap_blkno: Option<u32> = None;
|
let mut new_heap_blkno: Option<u32> = None;
|
||||||
let mut old_heap_blkno: Option<u32> = None;
|
let mut old_heap_blkno: Option<u32> = None;
|
||||||
|
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
||||||
|
|
||||||
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
|
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
|
||||||
|
|
||||||
match self.timeline.pg_version {
|
match self.timeline.pg_version {
|
||||||
@@ -772,7 +810,11 @@ impl<'a> WalIngest<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
pg_constants::XLOG_NEON_HEAP_LOCK => {
|
pg_constants::XLOG_NEON_HEAP_LOCK => {
|
||||||
/* XLOG_NEON_HEAP_LOCK doesn't need special care */
|
let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
|
||||||
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
||||||
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||||
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
|
info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
|
||||||
}
|
}
|
||||||
@@ -783,8 +825,6 @@ impl<'a> WalIngest<'a> {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: What about XLOG_NEON_HEAP_LOCK?
|
|
||||||
|
|
||||||
// Clear the VM bits if required.
|
// Clear the VM bits if required.
|
||||||
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
||||||
let vm_rel = RelTag {
|
let vm_rel = RelTag {
|
||||||
@@ -827,7 +867,7 @@ impl<'a> WalIngest<'a> {
|
|||||||
NeonWalRecord::ClearVisibilityMapFlags {
|
NeonWalRecord::ClearVisibilityMapFlags {
|
||||||
new_heap_blkno,
|
new_heap_blkno,
|
||||||
old_heap_blkno,
|
old_heap_blkno,
|
||||||
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
|
flags,
|
||||||
},
|
},
|
||||||
ctx,
|
ctx,
|
||||||
)
|
)
|
||||||
@@ -843,7 +883,7 @@ impl<'a> WalIngest<'a> {
|
|||||||
NeonWalRecord::ClearVisibilityMapFlags {
|
NeonWalRecord::ClearVisibilityMapFlags {
|
||||||
new_heap_blkno,
|
new_heap_blkno,
|
||||||
old_heap_blkno: None,
|
old_heap_blkno: None,
|
||||||
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
|
flags,
|
||||||
},
|
},
|
||||||
ctx,
|
ctx,
|
||||||
)
|
)
|
||||||
@@ -857,7 +897,7 @@ impl<'a> WalIngest<'a> {
|
|||||||
NeonWalRecord::ClearVisibilityMapFlags {
|
NeonWalRecord::ClearVisibilityMapFlags {
|
||||||
new_heap_blkno: None,
|
new_heap_blkno: None,
|
||||||
old_heap_blkno,
|
old_heap_blkno,
|
||||||
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
|
flags,
|
||||||
},
|
},
|
||||||
ctx,
|
ctx,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -219,20 +219,66 @@ pub mod v14 {
|
|||||||
old_offnum: buf.get_u16_le(),
|
old_offnum: buf.get_u16_le(),
|
||||||
old_infobits_set: buf.get_u8(),
|
old_infobits_set: buf.get_u8(),
|
||||||
flags: buf.get_u8(),
|
flags: buf.get_u8(),
|
||||||
t_cid: buf.get_u32(),
|
t_cid: buf.get_u32_le(),
|
||||||
new_xmax: buf.get_u32_le(),
|
new_xmax: buf.get_u32_le(),
|
||||||
new_offnum: buf.get_u16_le(),
|
new_offnum: buf.get_u16_le(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlHeapLock {
|
||||||
|
pub locking_xid: TransactionId,
|
||||||
|
pub offnum: OffsetNumber,
|
||||||
|
pub _padding: u16,
|
||||||
|
pub t_cid: u32,
|
||||||
|
pub infobits_set: u8,
|
||||||
|
pub flags: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlHeapLock {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlHeapLock {
|
||||||
|
XlHeapLock {
|
||||||
|
locking_xid: buf.get_u32_le(),
|
||||||
|
offnum: buf.get_u16_le(),
|
||||||
|
_padding: buf.get_u16_le(),
|
||||||
|
t_cid: buf.get_u32_le(),
|
||||||
|
infobits_set: buf.get_u8(),
|
||||||
|
flags: buf.get_u8(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlHeapLockUpdated {
|
||||||
|
pub xmax: TransactionId,
|
||||||
|
pub offnum: OffsetNumber,
|
||||||
|
pub infobits_set: u8,
|
||||||
|
pub flags: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlHeapLockUpdated {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
|
||||||
|
XlHeapLockUpdated {
|
||||||
|
xmax: buf.get_u32_le(),
|
||||||
|
offnum: buf.get_u16_le(),
|
||||||
|
infobits_set: buf.get_u8(),
|
||||||
|
flags: buf.get_u8(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod v15 {
|
pub mod v15 {
|
||||||
pub use super::v14::{XlHeapDelete, XlHeapInsert, XlHeapMultiInsert, XlHeapUpdate};
|
pub use super::v14::{
|
||||||
|
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod v16 {
|
pub mod v16 {
|
||||||
pub use super::v14::{XlHeapInsert, XlHeapMultiInsert};
|
pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
|
||||||
use bytes::{Buf, Bytes};
|
use bytes::{Buf, Bytes};
|
||||||
use postgres_ffi::{OffsetNumber, TransactionId};
|
use postgres_ffi::{OffsetNumber, TransactionId};
|
||||||
|
|
||||||
@@ -278,6 +324,26 @@ pub mod v16 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlHeapLock {
|
||||||
|
pub locking_xid: TransactionId,
|
||||||
|
pub offnum: OffsetNumber,
|
||||||
|
pub infobits_set: u8,
|
||||||
|
pub flags: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlHeapLock {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlHeapLock {
|
||||||
|
XlHeapLock {
|
||||||
|
locking_xid: buf.get_u32_le(),
|
||||||
|
offnum: buf.get_u16_le(),
|
||||||
|
infobits_set: buf.get_u8(),
|
||||||
|
flags: buf.get_u8(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
|
/* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
|
||||||
pub mod rm_neon {
|
pub mod rm_neon {
|
||||||
use bytes::{Buf, Bytes};
|
use bytes::{Buf, Bytes};
|
||||||
@@ -366,6 +432,28 @@ pub mod v16 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlNeonHeapLock {
|
||||||
|
pub locking_xid: TransactionId,
|
||||||
|
pub t_cid: u32,
|
||||||
|
pub offnum: OffsetNumber,
|
||||||
|
pub infobits_set: u8,
|
||||||
|
pub flags: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlNeonHeapLock {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
|
||||||
|
XlNeonHeapLock {
|
||||||
|
locking_xid: buf.get_u32_le(),
|
||||||
|
t_cid: buf.get_u32_le(),
|
||||||
|
offnum: buf.get_u16_le(),
|
||||||
|
infobits_set: buf.get_u8(),
|
||||||
|
flags: buf.get_u8(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -111,3 +111,103 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
|||||||
assert cur_new.fetchall() == []
|
assert cur_new.fetchall() == []
|
||||||
cur_new.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
|
cur_new.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
|
||||||
assert cur_new.fetchall() == []
|
assert cur_new.fetchall() == []
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
# Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK
|
||||||
|
# record.
|
||||||
|
#
|
||||||
|
def test_vm_bit_clear_on_heap_lock(neon_simple_env: NeonEnv):
|
||||||
|
env = neon_simple_env
|
||||||
|
|
||||||
|
env.neon_cli.create_branch("test_vm_bit_clear_on_heap_lock", "empty")
|
||||||
|
endpoint = env.endpoints.create_start(
|
||||||
|
"test_vm_bit_clear_on_heap_lock",
|
||||||
|
config_lines=[
|
||||||
|
"log_autovacuum_min_duration = 0",
|
||||||
|
# Perform anti-wraparound vacuuming aggressively
|
||||||
|
"autovacuum_naptime='1 s'",
|
||||||
|
"autovacuum_freeze_max_age = 1000000",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
pg_conn = endpoint.connect()
|
||||||
|
cur = pg_conn.cursor()
|
||||||
|
|
||||||
|
# Install extension containing function needed for test
|
||||||
|
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||||
|
|
||||||
|
cur.execute("SELECT pg_switch_wal()")
|
||||||
|
|
||||||
|
# Create a test table and freeze it to set the all-frozen VM bit on all pages.
|
||||||
|
cur.execute("CREATE TABLE vmtest_lock (id integer PRIMARY KEY)")
|
||||||
|
cur.execute("INSERT INTO vmtest_lock SELECT g FROM generate_series(1, 50000) g")
|
||||||
|
cur.execute("VACUUM FREEZE vmtest_lock")
|
||||||
|
|
||||||
|
# Lock a row. This clears the all-frozen VM bit for that page.
|
||||||
|
cur.execute("SELECT * FROM vmtest_lock WHERE id = 40000 FOR UPDATE")
|
||||||
|
|
||||||
|
# Remember the XID. We will use it later to verify that we have consumed a lot of
|
||||||
|
# XIDs after this.
|
||||||
|
cur.execute("select pg_current_xact_id()")
|
||||||
|
locking_xid = cur.fetchall()[0][0]
|
||||||
|
|
||||||
|
# Stop and restart postgres, to clear the buffer cache.
|
||||||
|
#
|
||||||
|
# NOTE: clear_buffer_cache() will not do, because it evicts the dirty pages
|
||||||
|
# in a "clean" way. Our neon extension will write a full-page image of the VM
|
||||||
|
# page, and we want to avoid that.
|
||||||
|
endpoint.stop()
|
||||||
|
endpoint.start()
|
||||||
|
pg_conn = endpoint.connect()
|
||||||
|
cur = pg_conn.cursor()
|
||||||
|
|
||||||
|
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ")
|
||||||
|
tup = cur.fetchall()
|
||||||
|
xmax_before = tup[0][1]
|
||||||
|
|
||||||
|
# Consume a lot of XIDs, so that anti-wraparound autovacuum kicks
|
||||||
|
# in and the clog gets truncated. We set autovacuum_freeze_max_age to a very
|
||||||
|
# low value, so it doesn't take all that many XIDs for autovacuum to kick in.
|
||||||
|
for i in range(1000):
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
CREATE TEMP TABLE othertable (i int) ON COMMIT DROP;
|
||||||
|
do $$
|
||||||
|
begin
|
||||||
|
for i in 1..100000 loop
|
||||||
|
-- Use a begin-exception block to generate a new subtransaction on each iteration
|
||||||
|
begin
|
||||||
|
insert into othertable values (i);
|
||||||
|
exception when others then
|
||||||
|
raise 'not expected %', sqlerrm;
|
||||||
|
end;
|
||||||
|
end loop;
|
||||||
|
end;
|
||||||
|
$$;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ")
|
||||||
|
tup = cur.fetchall()
|
||||||
|
log.info(f"tuple = {tup}")
|
||||||
|
xmax = tup[0][1]
|
||||||
|
assert xmax == xmax_before
|
||||||
|
|
||||||
|
if i % 50 == 0:
|
||||||
|
cur.execute("select datfrozenxid from pg_database where datname='postgres'")
|
||||||
|
datfrozenxid = cur.fetchall()[0][0]
|
||||||
|
if datfrozenxid > locking_xid:
|
||||||
|
break
|
||||||
|
|
||||||
|
cur.execute("select pg_current_xact_id()")
|
||||||
|
curr_xid = cur.fetchall()[0][0]
|
||||||
|
assert int(curr_xid) - int(locking_xid) >= 100000
|
||||||
|
|
||||||
|
# Now, if the VM all-frozen bit was not correctly cleared on
|
||||||
|
# replay, we will try to fetch the status of the XID that was
|
||||||
|
# already truncated away.
|
||||||
|
#
|
||||||
|
# ERROR: could not access status of transaction 1027
|
||||||
|
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 for update")
|
||||||
|
tup = cur.fetchall()
|
||||||
|
log.info(f"tuple = {tup}")
|
||||||
|
|||||||
Reference in New Issue
Block a user