mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
3 Commits
release-pr
...
problame/p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
429b188f85 | ||
|
|
40a8ed28b7 | ||
|
|
ad130f831a |
@@ -79,6 +79,8 @@ pub const XLOG_XACT_PREPARE: u8 = 0x10;
|
||||
pub const XLOG_XACT_ABORT: u8 = 0x20;
|
||||
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
|
||||
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
|
||||
pub const XLOG_XACT_ASSIGNMENT: u8 = 0x50;
|
||||
pub const XLOG_XACT_INVALIDATIONS: u8 = 0x60;
|
||||
|
||||
// From standbydefs.h
|
||||
pub const XLOG_RUNNING_XACTS: u8 = 0x10;
|
||||
@@ -106,12 +108,6 @@ pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5;
|
||||
// pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6;
|
||||
// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7;
|
||||
|
||||
// From pg_control.h and rmgrlist.h
|
||||
pub const XLOG_NEXTOID: u8 = 0x30;
|
||||
pub const XLOG_SWITCH: u8 = 0x40;
|
||||
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
|
||||
pub const XLOG_FPI: u8 = 0xB0;
|
||||
|
||||
// From multixact.h
|
||||
pub const FIRST_MULTIXACT_ID: u32 = 1;
|
||||
pub const MAX_MULTIXACT_ID: u32 = 0xFFFFFFFF;
|
||||
@@ -139,12 +135,20 @@ pub const MULTIXACT_MEMBERS_PER_PAGE: u16 =
|
||||
pub const XLOG_HEAP_INSERT: u8 = 0x00;
|
||||
pub const XLOG_HEAP_DELETE: u8 = 0x10;
|
||||
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
|
||||
pub const XLOG_HEAP_TRUNCATE: u8 = 0x30;
|
||||
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
|
||||
pub const XLOG_HEAP_CONFIRM: u8 = 0x50;
|
||||
pub const XLOG_HEAP_LOCK: u8 = 0x60;
|
||||
pub const XLOG_HEAP_INPLACE: u8 = 0x70;
|
||||
pub const XLOG_HEAP_INIT_PAGE: u8 = 0x80;
|
||||
pub const XLOG_HEAP2_REWRITE: u8 = 0x00;
|
||||
pub const XLOG_HEAP2_PRUNE: u8 = 0x10;
|
||||
pub const XLOG_HEAP2_VACUUM: u8 = 0x20;
|
||||
pub const XLOG_HEAP2_FREEZE_PAGE: u8 = 0x30;
|
||||
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
|
||||
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
|
||||
pub const XLOG_HEAP2_LOCK_UPDATED: u8 = 0x60;
|
||||
pub const XLOG_HEAP2_NEW_CID: u8 = 0x70;
|
||||
pub const XLH_LOCK_ALL_FROZEN_CLEARED: u8 = 0x01;
|
||||
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
|
||||
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
||||
@@ -167,9 +171,21 @@ pub const RM_RELMAP_ID: u8 = 7;
|
||||
pub const RM_STANDBY_ID: u8 = 8;
|
||||
pub const RM_HEAP2_ID: u8 = 9;
|
||||
pub const RM_HEAP_ID: u8 = 10;
|
||||
pub const RM_BTREE_ID: u8 = 11;
|
||||
pub const RM_HASH_ID: u8 = 12;
|
||||
pub const RM_GIN_ID: u8 = 13;
|
||||
pub const RM_GIST_ID: u8 = 14;
|
||||
pub const RM_SEQ_ID: u8 = 15;
|
||||
pub const RM_SPGIST_ID: u8 = 16;
|
||||
pub const RM_BRIN_ID: u8 = 17;
|
||||
pub const RM_COMMIT_TS_ID: u8 = 18;
|
||||
pub const RM_REPLORIGIN_ID: u8 = 19;
|
||||
pub const RM_GENERIC_ID: u8 = 20;
|
||||
pub const RM_LOGICALMSG_ID: u8 = 21;
|
||||
|
||||
// from relmapper.h
|
||||
pub const XLOG_RELMAP_UPDATE: u8 = 0x0;
|
||||
|
||||
// from neon_rmgr.h
|
||||
pub const RM_NEON_ID: u8 = 134;
|
||||
|
||||
@@ -219,8 +235,22 @@ pub const INVALID_TRANSACTION_ID: u32 = 0;
|
||||
pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000;
|
||||
pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
|
||||
|
||||
/* pg_control.h */
|
||||
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
|
||||
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
|
||||
pub const XLOG_NOOP: u8 = 0x20;
|
||||
pub const XLOG_NEXTOID: u8 = 0x30;
|
||||
pub const XLOG_SWITCH: u8 = 0x40;
|
||||
pub const XLOG_BACKUP_END: u8 = 0x50;
|
||||
pub const XLOG_PARAMETER_CHANGE: u8 = 0x60;
|
||||
pub const XLOG_RESTORE_POINT: u8 = 0x70;
|
||||
pub const XLOG_FPW_CHANGE: u8 = 0x80;
|
||||
pub const XLOG_END_OF_RECOVERY: u8 = 0x90;
|
||||
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
|
||||
pub const XLOG_FPI: u8 = 0xB0;
|
||||
/* 0xC0 is used in Postgres 9.5-11 */
|
||||
pub const XLOG_OVERWRITE_CONTRECORD: u8 = 0xD0;
|
||||
|
||||
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
|
||||
pub const XLP_LONG_HEADER: u16 = 0x0002;
|
||||
|
||||
|
||||
@@ -55,6 +55,36 @@ pub struct WalIngest {
|
||||
checkpoint_modified: bool,
|
||||
}
|
||||
|
||||
macro_rules! special_treatment_check {
|
||||
(needs none) => {{
|
||||
// we acknowledge that this record type needs no special treatment
|
||||
}};
|
||||
(unknown record type, $pg_version:expr, $lsn:expr, $decoded:expr) => {{
|
||||
let pg_version: u32 = $pg_version;
|
||||
let lsn: Lsn = $lsn;
|
||||
let decoded: &DecodedWALRecord = $decoded;
|
||||
use std::sync::atomic;
|
||||
static LOGGED: atomic::AtomicBool = atomic::AtomicBool::new(false);
|
||||
if LOGGED
|
||||
.compare_exchange(
|
||||
false,
|
||||
true,
|
||||
atomic::Ordering::Relaxed,
|
||||
atomic::Ordering::Relaxed,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
warn!(
|
||||
pg_version,
|
||||
%lsn,
|
||||
xl_rmid = %decoded.xl_rmid,
|
||||
xl_info = %decoded.xl_info,
|
||||
"unknown WAL record type, investigate whether it needs special treatment"
|
||||
);
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
impl WalIngest {
|
||||
pub async fn new(
|
||||
timeline: &Timeline,
|
||||
@@ -111,6 +141,7 @@ impl WalIngest {
|
||||
|
||||
failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
|
||||
|
||||
#[allow(clippy::if_same_then_else)]
|
||||
match decoded.xl_rmid {
|
||||
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
||||
// Heap AM records need some special handling, because they modify VM pages
|
||||
@@ -134,6 +165,8 @@ impl WalIngest {
|
||||
let truncate = XlSmgrTruncate::decode(&mut buf);
|
||||
self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
|
||||
.await?;
|
||||
} else {
|
||||
special_treatment_check!(needs none);
|
||||
}
|
||||
}
|
||||
pg_constants::RM_DBASE_ID => {
|
||||
@@ -155,6 +188,8 @@ impl WalIngest {
|
||||
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
|
||||
.await?;
|
||||
}
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
} else if pg_version == 15 {
|
||||
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
@@ -175,6 +210,8 @@ impl WalIngest {
|
||||
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
|
||||
.await?;
|
||||
}
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
} else if pg_version == 16 {
|
||||
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
@@ -195,11 +232,16 @@ impl WalIngest {
|
||||
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
|
||||
.await?;
|
||||
}
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
}
|
||||
pg_constants::RM_TBLSPC_ID => {
|
||||
trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
|
||||
todo!() // should we do: special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
pg_constants::RM_CLOG_ID => {
|
||||
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||
@@ -217,11 +259,12 @@ impl WalIngest {
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
assert!(info == pg_constants::CLOG_TRUNCATE);
|
||||
} else if info == pg_constants::CLOG_TRUNCATE {
|
||||
let xlrec = XlClogTruncate::decode(&mut buf);
|
||||
self.ingest_clog_truncate_record(modification, &xlrec, ctx)
|
||||
.await?;
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
}
|
||||
pg_constants::RM_XACT_ID => {
|
||||
@@ -265,6 +308,12 @@ impl WalIngest {
|
||||
modification
|
||||
.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
|
||||
.await?;
|
||||
} else if info == pg_constants::XLOG_XACT_ASSIGNMENT {
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_XACT_INVALIDATIONS {
|
||||
special_treatment_check!(needs none);
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
}
|
||||
pg_constants::RM_MULTIXACT_ID => {
|
||||
@@ -303,12 +352,20 @@ impl WalIngest {
|
||||
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
||||
self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
|
||||
.await?;
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
}
|
||||
pg_constants::RM_RELMAP_ID => {
|
||||
let xlrec = XlRelmapUpdate::decode(&mut buf);
|
||||
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
|
||||
.await?;
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
|
||||
if info == pg_constants::XLOG_RELMAP_UPDATE {
|
||||
let xlrec = XlRelmapUpdate::decode(&mut buf);
|
||||
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
|
||||
.await?;
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
}
|
||||
pg_constants::RM_XLOG_ID => {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
@@ -318,6 +375,8 @@ impl WalIngest {
|
||||
if self.checkpoint.nextOid != next_oid {
|
||||
self.checkpoint.nextOid = next_oid;
|
||||
self.checkpoint_modified = true;
|
||||
} else {
|
||||
special_treatment_check!(needs none);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|
||||
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
@@ -350,6 +409,31 @@ impl WalIngest {
|
||||
// have some trace of the checkpoint records in the layer files at the same
|
||||
// LSNs.
|
||||
self.checkpoint_modified = true;
|
||||
} else if info == pg_constants::XLOG_FPI || info == pg_constants::XLOG_FPI_FOR_HINT
|
||||
{
|
||||
// These records are importan for us, bu they are handled by
|
||||
// generic ingest_decoded_block() function below. They don't need
|
||||
// any special handling.
|
||||
//
|
||||
// HEIKKI: Is Noop the right code for that case?
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_NOOP
|
||||
|| info == pg_constants::XLOG_NEXTOID
|
||||
|| info == pg_constants::XLOG_SWITCH
|
||||
|| info == pg_constants::XLOG_BACKUP_END
|
||||
|| info == pg_constants::XLOG_PARAMETER_CHANGE
|
||||
|| info == pg_constants::XLOG_RESTORE_POINT
|
||||
|| info == pg_constants::XLOG_FPW_CHANGE
|
||||
|| info == pg_constants::XLOG_END_OF_RECOVERY
|
||||
{
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_OVERWRITE_CONTRECORD {
|
||||
// HEIKKI: I suspect we're not handling these correctly.
|
||||
// See https://github.com/neondatabase/neon/issues/934
|
||||
// Given that, not sure what the right outcome is.
|
||||
todo!()
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
}
|
||||
pg_constants::RM_LOGICALMSG_ID => {
|
||||
@@ -365,9 +449,14 @@ impl WalIngest {
|
||||
// we could peek into the message and only pause if it contains
|
||||
// a particular string, for example, but this is enough for now.
|
||||
failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
|
||||
special_treatment_check!(needs none);
|
||||
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
|
||||
modification.put_file(path, message, ctx).await?;
|
||||
} else {
|
||||
special_treatment_check!(needs none);
|
||||
}
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
}
|
||||
pg_constants::RM_STANDBY_ID => {
|
||||
@@ -375,6 +464,9 @@ impl WalIngest {
|
||||
if info == pg_constants::XLOG_RUNNING_XACTS {
|
||||
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
|
||||
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
|
||||
todo!() // checkpoint_modified=true missing?
|
||||
} else {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
pg_constants::RM_REPLORIGIN_ID => {
|
||||
@@ -387,13 +479,27 @@ impl WalIngest {
|
||||
} else if info == pg_constants::XLOG_REPLORIGIN_DROP {
|
||||
let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
|
||||
modification.drop_replorigin(xlrec.node_id).await?
|
||||
} else {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
_x => {
|
||||
// TODO: should probably log & fail here instead of blindly
|
||||
// doing something without understanding the protocol
|
||||
}
|
||||
}
|
||||
|
||||
// All of these are handled by the generic ingest_decoded_block function
|
||||
pg_constants::RM_BTREE_ID => special_treatment_check!(needs none),
|
||||
pg_constants::RM_HASH_ID => special_treatment_check!(needs none),
|
||||
pg_constants::RM_GIN_ID => special_treatment_check!(needs none),
|
||||
pg_constants::RM_GIST_ID => special_treatment_check!(needs none),
|
||||
pg_constants::RM_SEQ_ID => special_treatment_check!(needs none),
|
||||
pg_constants::RM_SPGIST_ID => special_treatment_check!(needs none),
|
||||
pg_constants::RM_BRIN_ID => special_treatment_check!(needs none),
|
||||
pg_constants::RM_GENERIC_ID => special_treatment_check!(needs none),
|
||||
|
||||
// We don't support the commit-ts tracking in neon. No harm if we see
|
||||
// these records though.
|
||||
pg_constants::RM_COMMIT_TS_ID => special_treatment_check!(needs none),
|
||||
|
||||
_x => special_treatment_check!(unknown record type, pg_version, lsn, decoded),
|
||||
};
|
||||
|
||||
// Iterate through all the blocks that the record modifies, and
|
||||
// "put" a separate copy of the record for each block.
|
||||
@@ -541,7 +647,10 @@ impl WalIngest {
|
||||
let mut old_heap_blkno: Option<u32> = None;
|
||||
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
||||
|
||||
match modification.tline.pg_version {
|
||||
let pg_version = modification.tline.pg_version;
|
||||
let lsn = modification.get_lsn();
|
||||
#[allow(clippy::if_same_then_else)]
|
||||
match pg_version {
|
||||
14 => {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
@@ -580,6 +689,19 @@ impl WalIngest {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_TRUNCATE {
|
||||
// per comment in heap_redo:
|
||||
// TRUNCATE is a no-op because the actions are already logged as
|
||||
// SMGR WAL records. TRUNCATE WAL record only exists for logical
|
||||
// decoding.
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_HEAP_CONFIRM
|
||||
|| info == pg_constants::XLOG_HEAP_INPLACE
|
||||
{
|
||||
// these don't update the FSM or VM, so no special handling needed.
|
||||
special_treatment_check!(needs none);
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
@@ -604,6 +726,23 @@ impl WalIngest {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_REWRITE
|
||||
|| info == pg_constants::XLOG_HEAP2_NEW_CID
|
||||
{
|
||||
// related to logical replication, we can ignore in storage
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_HEAP2_PRUNE
|
||||
|| info == pg_constants::XLOG_HEAP2_VACUUM
|
||||
|| info == pg_constants::XLOG_HEAP2_FREEZE_PAGE
|
||||
{
|
||||
// these don't update the VM, so no special handling needed.
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_HEAP2_VISIBLE {
|
||||
// This updates the VM, but the VM page is registered as a normal
|
||||
// block in the WAL record, so no special handling is needed.
|
||||
special_treatment_check!(needs none);
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
} else {
|
||||
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
@@ -647,6 +786,19 @@ impl WalIngest {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_TRUNCATE {
|
||||
// per comment in heap_redo:
|
||||
// TRUNCATE is a no-op because the actions are already logged as
|
||||
// SMGR WAL records. TRUNCATE WAL record only exists for logical
|
||||
// decoding.
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_HEAP_CONFIRM
|
||||
|| info == pg_constants::XLOG_HEAP_INPLACE
|
||||
{
|
||||
// these don't update the FSM or VM, so no special handling needed.
|
||||
special_treatment_check!(needs none);
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
@@ -671,6 +823,23 @@ impl WalIngest {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_REWRITE
|
||||
|| info == pg_constants::XLOG_HEAP2_NEW_CID
|
||||
{
|
||||
// related to logical replication, we can ignore in storage
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_HEAP2_PRUNE
|
||||
|| info == pg_constants::XLOG_HEAP2_VACUUM
|
||||
|| info == pg_constants::XLOG_HEAP2_FREEZE_PAGE
|
||||
{
|
||||
// these don't update the VM, so no special handling needed.
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_HEAP2_VISIBLE {
|
||||
// This updates the VM, but the VM page is registered as a normal
|
||||
// block in the WAL record, so no special handling is needed.
|
||||
special_treatment_check!(needs none);
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
} else {
|
||||
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
@@ -714,6 +883,19 @@ impl WalIngest {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_TRUNCATE {
|
||||
// per comment in heap_redo:
|
||||
// TRUNCATE is a no-op because the actions are already logged as
|
||||
// SMGR WAL records. TRUNCATE WAL record only exists for logical
|
||||
// decoding.
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_HEAP_CONFIRM
|
||||
|| info == pg_constants::XLOG_HEAP_INPLACE
|
||||
{
|
||||
// these don't update the FSM or VM, so no special handling needed.
|
||||
special_treatment_check!(needs none);
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
@@ -738,6 +920,23 @@ impl WalIngest {
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP2_REWRITE
|
||||
|| info == pg_constants::XLOG_HEAP2_NEW_CID
|
||||
{
|
||||
// related to logical replication, we can ignore in storage
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_HEAP2_PRUNE
|
||||
|| info == pg_constants::XLOG_HEAP2_VACUUM
|
||||
|| info == pg_constants::XLOG_HEAP2_FREEZE_PAGE
|
||||
{
|
||||
// these don't update the VM, so no special handling needed.
|
||||
special_treatment_check!(needs none);
|
||||
} else if info == pg_constants::XLOG_HEAP2_VISIBLE {
|
||||
// This updates the VM, but the VM page is registered as a normal
|
||||
// block in the WAL record, so no special handling is needed.
|
||||
special_treatment_check!(needs none);
|
||||
} else {
|
||||
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
|
||||
}
|
||||
} else {
|
||||
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
||||
|
||||
Reference in New Issue
Block a user