Compare commits

...

3 Commits

Author SHA1 Message Date
Christian Schwarz
429b188f85 self-review, some new todo!()s 2024-06-19 17:49:15 +02:00
Christian Schwarz
40a8ed28b7 Merge remote-tracking branch 'origin/main' into problame/pr-6002-stripped
Conflicts around

- https://github.com/neondatabase/neon/pull/6705
- https://github.com/neondatabase/neon/pull/7099
2024-06-19 17:35:16 +02:00
Christian Schwarz
ad130f831a walingest: log a warning once per process upon unknown record
This is a stripped-down version of https://github.com/neondatabase/neon/pull/6002,
taking the exhaustive matching of xl_rmid and xl_info that Heikki
contributed there.

Unlike the `outcome` type approach in 6002, this stripped-down version
relies on coding discipline, i.e., rustc won't complain if you forget to
add a `special_treatment_check!()`.

Future PRs should transform the code into an exhaustive match statement.
2024-01-09 10:23:13 +01:00
2 changed files with 246 additions and 17 deletions

View File

@@ -79,6 +79,8 @@ pub const XLOG_XACT_PREPARE: u8 = 0x10;
pub const XLOG_XACT_ABORT: u8 = 0x20;
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
pub const XLOG_XACT_ASSIGNMENT: u8 = 0x50;
pub const XLOG_XACT_INVALIDATIONS: u8 = 0x60;
// From standbydefs.h
pub const XLOG_RUNNING_XACTS: u8 = 0x10;
@@ -106,12 +108,6 @@ pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5;
// pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6;
// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7;
// From pg_control.h and rmgrlist.h
pub const XLOG_NEXTOID: u8 = 0x30;
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
pub const XLOG_FPI: u8 = 0xB0;
// From multixact.h
pub const FIRST_MULTIXACT_ID: u32 = 1;
pub const MAX_MULTIXACT_ID: u32 = 0xFFFFFFFF;
@@ -139,12 +135,20 @@ pub const MULTIXACT_MEMBERS_PER_PAGE: u16 =
pub const XLOG_HEAP_INSERT: u8 = 0x00;
pub const XLOG_HEAP_DELETE: u8 = 0x10;
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_HEAP_TRUNCATE: u8 = 0x30;
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
pub const XLOG_HEAP_CONFIRM: u8 = 0x50;
pub const XLOG_HEAP_LOCK: u8 = 0x60;
pub const XLOG_HEAP_INPLACE: u8 = 0x70;
pub const XLOG_HEAP_INIT_PAGE: u8 = 0x80;
pub const XLOG_HEAP2_REWRITE: u8 = 0x00;
pub const XLOG_HEAP2_PRUNE: u8 = 0x10;
pub const XLOG_HEAP2_VACUUM: u8 = 0x20;
pub const XLOG_HEAP2_FREEZE_PAGE: u8 = 0x30;
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
pub const XLOG_HEAP2_LOCK_UPDATED: u8 = 0x60;
pub const XLOG_HEAP2_NEW_CID: u8 = 0x70;
pub const XLH_LOCK_ALL_FROZEN_CLEARED: u8 = 0x01;
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
@@ -167,9 +171,21 @@ pub const RM_RELMAP_ID: u8 = 7;
pub const RM_STANDBY_ID: u8 = 8;
pub const RM_HEAP2_ID: u8 = 9;
pub const RM_HEAP_ID: u8 = 10;
pub const RM_BTREE_ID: u8 = 11;
pub const RM_HASH_ID: u8 = 12;
pub const RM_GIN_ID: u8 = 13;
pub const RM_GIST_ID: u8 = 14;
pub const RM_SEQ_ID: u8 = 15;
pub const RM_SPGIST_ID: u8 = 16;
pub const RM_BRIN_ID: u8 = 17;
pub const RM_COMMIT_TS_ID: u8 = 18;
pub const RM_REPLORIGIN_ID: u8 = 19;
pub const RM_GENERIC_ID: u8 = 20;
pub const RM_LOGICALMSG_ID: u8 = 21;
// from relmapper.h
pub const XLOG_RELMAP_UPDATE: u8 = 0x0;
// from neon_rmgr.h
pub const RM_NEON_ID: u8 = 134;
@@ -219,8 +235,22 @@ pub const INVALID_TRANSACTION_ID: u32 = 0;
pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000;
pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
/* pg_control.h */
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLOG_NOOP: u8 = 0x20;
pub const XLOG_NEXTOID: u8 = 0x30;
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_BACKUP_END: u8 = 0x50;
pub const XLOG_PARAMETER_CHANGE: u8 = 0x60;
pub const XLOG_RESTORE_POINT: u8 = 0x70;
pub const XLOG_FPW_CHANGE: u8 = 0x80;
pub const XLOG_END_OF_RECOVERY: u8 = 0x90;
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
pub const XLOG_FPI: u8 = 0xB0;
/* 0xC0 is used in Postgres 9.5-11 */
pub const XLOG_OVERWRITE_CONTRECORD: u8 = 0xD0;
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_LONG_HEADER: u16 = 0x0002;

View File

@@ -55,6 +55,36 @@ pub struct WalIngest {
checkpoint_modified: bool,
}
macro_rules! special_treatment_check {
(needs none) => {{
// we acknowledge that this record type needs no special treatment
}};
(unknown record type, $pg_version:expr, $lsn:expr, $decoded:expr) => {{
let pg_version: u32 = $pg_version;
let lsn: Lsn = $lsn;
let decoded: &DecodedWALRecord = $decoded;
use std::sync::atomic;
static LOGGED: atomic::AtomicBool = atomic::AtomicBool::new(false);
if LOGGED
.compare_exchange(
false,
true,
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
)
.is_ok()
{
warn!(
pg_version,
%lsn,
xl_rmid = %decoded.xl_rmid,
xl_info = %decoded.xl_info,
"unknown WAL record type, investigate whether it needs special treatment"
);
}
}};
}
impl WalIngest {
pub async fn new(
timeline: &Timeline,
@@ -111,6 +141,7 @@ impl WalIngest {
failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
#[allow(clippy::if_same_then_else)]
match decoded.xl_rmid {
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
// Heap AM records need some special handling, because they modify VM pages
@@ -134,6 +165,8 @@ impl WalIngest {
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
.await?;
} else {
special_treatment_check!(needs none);
}
}
pg_constants::RM_DBASE_ID => {
@@ -155,6 +188,8 @@ impl WalIngest {
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
} else if pg_version == 15 {
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
@@ -175,6 +210,8 @@ impl WalIngest {
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
} else if pg_version == 16 {
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
@@ -195,11 +232,16 @@ impl WalIngest {
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
}
pg_constants::RM_TBLSPC_ID => {
trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
todo!() // should we do: special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
pg_constants::RM_CLOG_ID => {
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
@@ -217,11 +259,12 @@ impl WalIngest {
ctx,
)
.await?;
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
} else if info == pg_constants::CLOG_TRUNCATE {
let xlrec = XlClogTruncate::decode(&mut buf);
self.ingest_clog_truncate_record(modification, &xlrec, ctx)
.await?;
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
}
pg_constants::RM_XACT_ID => {
@@ -265,6 +308,12 @@ impl WalIngest {
modification
.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
.await?;
} else if info == pg_constants::XLOG_XACT_ASSIGNMENT {
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_XACT_INVALIDATIONS {
special_treatment_check!(needs none);
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
}
pg_constants::RM_MULTIXACT_ID => {
@@ -303,12 +352,20 @@ impl WalIngest {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
.await?;
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
}
pg_constants::RM_RELMAP_ID => {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
.await?;
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_RELMAP_UPDATE {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
.await?;
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
}
pg_constants::RM_XLOG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
@@ -318,6 +375,8 @@ impl WalIngest {
if self.checkpoint.nextOid != next_oid {
self.checkpoint.nextOid = next_oid;
self.checkpoint_modified = true;
} else {
special_treatment_check!(needs none);
}
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
@@ -350,6 +409,31 @@ impl WalIngest {
// have some trace of the checkpoint records in the layer files at the same
// LSNs.
self.checkpoint_modified = true;
} else if info == pg_constants::XLOG_FPI || info == pg_constants::XLOG_FPI_FOR_HINT
{
// These records are importan for us, bu they are handled by
// generic ingest_decoded_block() function below. They don't need
// any special handling.
//
// HEIKKI: Is Noop the right code for that case?
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_NOOP
|| info == pg_constants::XLOG_NEXTOID
|| info == pg_constants::XLOG_SWITCH
|| info == pg_constants::XLOG_BACKUP_END
|| info == pg_constants::XLOG_PARAMETER_CHANGE
|| info == pg_constants::XLOG_RESTORE_POINT
|| info == pg_constants::XLOG_FPW_CHANGE
|| info == pg_constants::XLOG_END_OF_RECOVERY
{
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_OVERWRITE_CONTRECORD {
// HEIKKI: I suspect we're not handling these correctly.
// See https://github.com/neondatabase/neon/issues/934
// Given that, not sure what the right outcome is.
todo!()
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
}
pg_constants::RM_LOGICALMSG_ID => {
@@ -365,9 +449,14 @@ impl WalIngest {
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
special_treatment_check!(needs none);
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
modification.put_file(path, message, ctx).await?;
} else {
special_treatment_check!(needs none);
}
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
}
pg_constants::RM_STANDBY_ID => {
@@ -375,6 +464,9 @@ impl WalIngest {
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
todo!() // checkpoint_modified=true missing?
} else {
todo!()
}
}
pg_constants::RM_REPLORIGIN_ID => {
@@ -387,13 +479,27 @@ impl WalIngest {
} else if info == pg_constants::XLOG_REPLORIGIN_DROP {
let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
modification.drop_replorigin(xlrec.node_id).await?
} else {
todo!()
}
}
_x => {
// TODO: should probably log & fail here instead of blindly
// doing something without understanding the protocol
}
}
// All of these are handled by the generic ingest_decoded_block function
pg_constants::RM_BTREE_ID => special_treatment_check!(needs none),
pg_constants::RM_HASH_ID => special_treatment_check!(needs none),
pg_constants::RM_GIN_ID => special_treatment_check!(needs none),
pg_constants::RM_GIST_ID => special_treatment_check!(needs none),
pg_constants::RM_SEQ_ID => special_treatment_check!(needs none),
pg_constants::RM_SPGIST_ID => special_treatment_check!(needs none),
pg_constants::RM_BRIN_ID => special_treatment_check!(needs none),
pg_constants::RM_GENERIC_ID => special_treatment_check!(needs none),
// We don't support the commit-ts tracking in neon. No harm if we see
// these records though.
pg_constants::RM_COMMIT_TS_ID => special_treatment_check!(needs none),
_x => special_treatment_check!(unknown record type, pg_version, lsn, decoded),
};
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
@@ -541,7 +647,10 @@ impl WalIngest {
let mut old_heap_blkno: Option<u32> = None;
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
match modification.tline.pg_version {
let pg_version = modification.tline.pg_version;
let lsn = modification.get_lsn();
#[allow(clippy::if_same_then_else)]
match pg_version {
14 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -580,6 +689,19 @@ impl WalIngest {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
} else if info == pg_constants::XLOG_HEAP_TRUNCATE {
// per comment in heap_redo:
// TRUNCATE is a no-op because the actions are already logged as
// SMGR WAL records. TRUNCATE WAL record only exists for logical
// decoding.
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_HEAP_CONFIRM
|| info == pg_constants::XLOG_HEAP_INPLACE
{
// these don't update the FSM or VM, so no special handling needed.
special_treatment_check!(needs none);
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -604,6 +726,23 @@ impl WalIngest {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
} else if info == pg_constants::XLOG_HEAP2_REWRITE
|| info == pg_constants::XLOG_HEAP2_NEW_CID
{
// related to logical replication, we can ignore in storage
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_HEAP2_PRUNE
|| info == pg_constants::XLOG_HEAP2_VACUUM
|| info == pg_constants::XLOG_HEAP2_FREEZE_PAGE
{
// these don't update the VM, so no special handling needed.
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_HEAP2_VISIBLE {
// This updates the VM, but the VM page is registered as a normal
// block in the WAL record, so no special handling is needed.
special_treatment_check!(needs none);
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -647,6 +786,19 @@ impl WalIngest {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
} else if info == pg_constants::XLOG_HEAP_TRUNCATE {
// per comment in heap_redo:
// TRUNCATE is a no-op because the actions are already logged as
// SMGR WAL records. TRUNCATE WAL record only exists for logical
// decoding.
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_HEAP_CONFIRM
|| info == pg_constants::XLOG_HEAP_INPLACE
{
// these don't update the FSM or VM, so no special handling needed.
special_treatment_check!(needs none);
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -671,6 +823,23 @@ impl WalIngest {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
} else if info == pg_constants::XLOG_HEAP2_REWRITE
|| info == pg_constants::XLOG_HEAP2_NEW_CID
{
// related to logical replication, we can ignore in storage
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_HEAP2_PRUNE
|| info == pg_constants::XLOG_HEAP2_VACUUM
|| info == pg_constants::XLOG_HEAP2_FREEZE_PAGE
{
// these don't update the VM, so no special handling needed.
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_HEAP2_VISIBLE {
// This updates the VM, but the VM page is registered as a normal
// block in the WAL record, so no special handling is needed.
special_treatment_check!(needs none);
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -714,6 +883,19 @@ impl WalIngest {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
} else if info == pg_constants::XLOG_HEAP_TRUNCATE {
// per comment in heap_redo:
// TRUNCATE is a no-op because the actions are already logged as
// SMGR WAL records. TRUNCATE WAL record only exists for logical
// decoding.
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_HEAP_CONFIRM
|| info == pg_constants::XLOG_HEAP_INPLACE
{
// these don't update the FSM or VM, so no special handling needed.
special_treatment_check!(needs none);
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -738,6 +920,23 @@ impl WalIngest {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
} else if info == pg_constants::XLOG_HEAP2_REWRITE
|| info == pg_constants::XLOG_HEAP2_NEW_CID
{
// related to logical replication, we can ignore in storage
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_HEAP2_PRUNE
|| info == pg_constants::XLOG_HEAP2_VACUUM
|| info == pg_constants::XLOG_HEAP2_FREEZE_PAGE
{
// these don't update the VM, so no special handling needed.
special_treatment_check!(needs none);
} else if info == pg_constants::XLOG_HEAP2_VISIBLE {
// This updates the VM, but the VM page is registered as a normal
// block in the WAL record, so no special handling is needed.
special_treatment_check!(needs none);
} else {
special_treatment_check!(unknown record type, pg_version, lsn, decoded);
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);