walingest: refactor if-cascade on decoded.xl_rmid into match statement (#5974)

refs https://github.com/neondatabase/neon/issues/5962

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
Christian Schwarz
2023-11-30 15:07:41 +01:00
committed by GitHub
parent 5d3c3636fc
commit 3bb1030f5d

View File

@@ -98,261 +98,258 @@ impl<'a> WalIngest<'a> {
self.checkpoint_modified = true;
}
// Heap AM records need some special handling, because they modify VM pages
// without registering them with the standard mechanism.
if decoded.xl_rmid == pg_constants::RM_HEAP_ID
|| decoded.xl_rmid == pg_constants::RM_HEAP2_ID
{
self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
.await?;
}
if decoded.xl_rmid == pg_constants::RM_NEON_ID {
self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
.await?;
}
// Handle other special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_CREATE
{
let create = XlSmgrCreate::decode(&mut buf);
self.ingest_xlog_smgr_create(modification, &create, ctx)
.await?;
} else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
.await?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
debug!(
"handle RM_DBASE_ID for Postgres version {:?}",
self.timeline.pg_version
);
if self.timeline.pg_version == 14 {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&mut buf);
debug!("XLOG_DBASE_CREATE v14");
match decoded.xl_rmid {
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
// Heap AM records need some special handling, because they modify VM pages
// without registering them with the standard mechanism.
self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
.await?;
}
pg_constants::RM_NEON_ID => {
self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
.await?;
}
// Handle other special record types
pg_constants::RM_SMGR_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
if info == pg_constants::XLOG_SMGR_CREATE {
let create = XlSmgrCreate::decode(&mut buf);
self.ingest_xlog_smgr_create(modification, &create, ctx)
.await?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v14::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
}
} else if self.timeline.pg_version == 15 {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG
{
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY
{
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
} else if info == pg_constants::XLOG_SMGR_TRUNCATE {
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
.await?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
}
} else if self.timeline.pg_version == 16 {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG
{
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY
{
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v16::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
}
}
} else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
} else if decoded.xl_rmid == pg_constants::RM_CLOG_ID {
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
SlruKind::Clog,
segno,
rpageno,
ZERO_PAGE.clone(),
ctx,
)
.await?;
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(&mut buf);
self.ingest_clog_truncate_record(modification, &xlrec, ctx)
.await?;
}
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
ctx,
)
.await?;
} else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
|| info == pg_constants::XLOG_XACT_ABORT_PREPARED
{
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
ctx,
)
.await?;
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
trace!(
"Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
decoded.xl_xid,
parsed_xact.xid,
lsn,
);
modification
.drop_twophase_file(parsed_xact.xid, ctx)
.await?;
} else if info == pg_constants::XLOG_XACT_PREPARE {
modification
.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
.await?;
}
} else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
pg_constants::RM_DBASE_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
debug!(%info, pg_version=%self.timeline.pg_version, "handle RM_DBASE_ID");
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
SlruKind::MultiXactOffsets,
segno,
rpageno,
ZERO_PAGE.clone(),
ctx,
)
.await?;
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
SlruKind::MultiXactMembers,
segno,
rpageno,
ZERO_PAGE.clone(),
ctx,
)
.await?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
self.ingest_multixact_create_record(modification, &xlrec)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
if self.timeline.pg_version == 14 {
if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
let createdb = XlCreateDatabase::decode(&mut buf);
debug!("XLOG_DBASE_CREATE v14");
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
} else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
}
} else if self.timeline.pg_version == 15 {
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
}
} else if self.timeline.pg_version == 16 {
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
}
}
}
pg_constants::RM_TBLSPC_ID => {
trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
}
pg_constants::RM_CLOG_ID => {
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
SlruKind::Clog,
segno,
rpageno,
ZERO_PAGE.clone(),
ctx,
)
.await?;
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
.await?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if self.checkpoint.nextOid != next_oid {
self.checkpoint.nextOid = next_oid;
self.checkpoint_modified = true;
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(&mut buf);
self.ingest_clog_truncate_record(modification, &xlrec, ctx)
.await?;
}
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
self.checkpoint.oldestXid
);
if (self
.checkpoint
.oldestXid
.wrapping_sub(xlog_checkpoint.oldestXid) as i32)
< 0
}
pg_constants::RM_XACT_ID => {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
ctx,
)
.await?;
} else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
|| info == pg_constants::XLOG_XACT_ABORT_PREPARED
{
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
self.checkpoint_modified = true;
}
}
} else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
let xlrec = XlLogicalMessage::decode(&mut buf);
let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
if prefix == "neon-test" {
// This is a convenient way to make the WAL ingestion pause at
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
crate::failpoint_support::sleep_millis_async!(
"wal-ingest-logical-message-sleep"
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
ctx,
)
.await?;
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
trace!(
"Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
decoded.xl_xid,
parsed_xact.xid,
lsn,
);
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
modification.put_file(path, message, ctx).await?;
modification
.drop_twophase_file(parsed_xact.xid, ctx)
.await?;
} else if info == pg_constants::XLOG_XACT_PREPARE {
modification
.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
.await?;
}
}
pg_constants::RM_MULTIXACT_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
SlruKind::MultiXactOffsets,
segno,
rpageno,
ZERO_PAGE.clone(),
ctx,
)
.await?;
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
modification,
SlruKind::MultiXactMembers,
segno,
rpageno,
ZERO_PAGE.clone(),
ctx,
)
.await?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
self.ingest_multixact_create_record(modification, &xlrec)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
.await?;
}
}
pg_constants::RM_RELMAP_ID => {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
.await?;
}
pg_constants::RM_XLOG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if self.checkpoint.nextOid != next_oid {
self.checkpoint.nextOid = next_oid;
self.checkpoint_modified = true;
}
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
self.checkpoint.oldestXid
);
if (self
.checkpoint
.oldestXid
.wrapping_sub(xlog_checkpoint.oldestXid) as i32)
< 0
{
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
self.checkpoint_modified = true;
}
}
}
pg_constants::RM_LOGICALMSG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
let xlrec = XlLogicalMessage::decode(&mut buf);
let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
if prefix == "neon-test" {
// This is a convenient way to make the WAL ingestion pause at
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
crate::failpoint_support::sleep_millis_async!(
"wal-ingest-logical-message-sleep"
);
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
modification.put_file(path, message, ctx).await?;
}
}
}
_x => {
// TODO: should probably log & fail here instead of blindly
// doing something without understanding the protocol
}
}
// Iterate through all the blocks that the record modifies, and