mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 15:41:15 +00:00
walingest: prepare for exhaustive match and handling of no-ops
Use rust type system to force the "special treatment" block
to declare the expected outcome of special treatment.
The TODO comments will be addressed in future commits, for now
let's just encode our expectations.
This commit doesn't compile yet because a bunch of `else { ... }`
blocks are missing. I'll ask Postgres-savvy colleagues to fill
in here.
This commit is contained in:
@@ -98,16 +98,24 @@ impl<'a> WalIngest<'a> {
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
|
||||
match decoded.xl_rmid {
|
||||
enum Outcome {
|
||||
Noop,
|
||||
Modified,
|
||||
UnknownRecordType,
|
||||
}
|
||||
|
||||
let outcome = match decoded.xl_rmid {
|
||||
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
||||
// Heap AM records need some special handling, because they modify VM pages
|
||||
// without registering them with the standard mechanism.
|
||||
self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
}
|
||||
pg_constants::RM_NEON_ID => {
|
||||
self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
}
|
||||
// Handle other special record types
|
||||
pg_constants::RM_SMGR_ID => {
|
||||
@@ -117,10 +125,12 @@ impl<'a> WalIngest<'a> {
|
||||
let create = XlSmgrCreate::decode(&mut buf);
|
||||
self.ingest_xlog_smgr_create(modification, &create, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
} else if info == pg_constants::XLOG_SMGR_TRUNCATE {
|
||||
let truncate = XlSmgrTruncate::decode(&mut buf);
|
||||
self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
}
|
||||
}
|
||||
pg_constants::RM_DBASE_ID => {
|
||||
@@ -134,6 +144,7 @@ impl<'a> WalIngest<'a> {
|
||||
|
||||
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
} else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(&mut buf);
|
||||
for tablespace_id in dropdb.tablespace_ids {
|
||||
@@ -141,11 +152,13 @@ impl<'a> WalIngest<'a> {
|
||||
modification
|
||||
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
}
|
||||
}
|
||||
} else if self.timeline.pg_version == 15 {
|
||||
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
Outcome::Noop
|
||||
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
// The XLOG record was renamed between v14 and v15,
|
||||
// but the record format is the same.
|
||||
@@ -154,18 +167,23 @@ impl<'a> WalIngest<'a> {
|
||||
let createdb = XlCreateDatabase::decode(&mut buf);
|
||||
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(&mut buf);
|
||||
let mut outcome = Outcome::Noop;
|
||||
for tablespace_id in dropdb.tablespace_ids {
|
||||
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
|
||||
modification
|
||||
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
|
||||
.await?;
|
||||
outcome = Outcome::Modified;
|
||||
}
|
||||
outcome
|
||||
}
|
||||
} else if self.timeline.pg_version == 16 {
|
||||
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
||||
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
||||
Outcome::Noop
|
||||
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
||||
// The XLOG record was renamed between v14 and v15,
|
||||
// but the record format is the same.
|
||||
@@ -174,19 +192,23 @@ impl<'a> WalIngest<'a> {
|
||||
let createdb = XlCreateDatabase::decode(&mut buf);
|
||||
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
|
||||
.await?;
|
||||
Outcome::Noop
|
||||
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
|
||||
let dropdb = XlDropDatabase::decode(&mut buf);
|
||||
let mut outcome = Outcome::Noop;
|
||||
for tablespace_id in dropdb.tablespace_ids {
|
||||
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
|
||||
modification
|
||||
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
|
||||
.await?;
|
||||
outcome = Outcome::Modified;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pg_constants::RM_TBLSPC_ID => {
|
||||
trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
|
||||
Outcome::Noop
|
||||
}
|
||||
pg_constants::RM_CLOG_ID => {
|
||||
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||
@@ -204,11 +226,13 @@ impl<'a> WalIngest<'a> {
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
} else {
|
||||
assert!(info == pg_constants::CLOG_TRUNCATE);
|
||||
let xlrec = XlClogTruncate::decode(&mut buf);
|
||||
self.ingest_clog_truncate_record(modification, &xlrec, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
}
|
||||
}
|
||||
pg_constants::RM_XACT_ID => {
|
||||
@@ -224,6 +248,7 @@ impl<'a> WalIngest<'a> {
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
} else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
|
||||
|| info == pg_constants::XLOG_XACT_ABORT_PREPARED
|
||||
{
|
||||
@@ -246,10 +271,12 @@ impl<'a> WalIngest<'a> {
|
||||
modification
|
||||
.drop_twophase_file(parsed_xact.xid, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||
modification
|
||||
.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
}
|
||||
}
|
||||
pg_constants::RM_MULTIXACT_ID => {
|
||||
@@ -268,6 +295,7 @@ impl<'a> WalIngest<'a> {
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
|
||||
let pageno = buf.get_u32_le();
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
@@ -281,19 +309,23 @@ impl<'a> WalIngest<'a> {
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
||||
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
||||
self.ingest_multixact_create_record(modification, &xlrec)?;
|
||||
Outcome::Modified
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
||||
self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
}
|
||||
}
|
||||
pg_constants::RM_RELMAP_ID => {
|
||||
let xlrec = XlRelmapUpdate::decode(&mut buf);
|
||||
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
|
||||
.await?;
|
||||
Outcome::Modified
|
||||
}
|
||||
pg_constants::RM_XLOG_ID => {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
@@ -304,6 +336,7 @@ impl<'a> WalIngest<'a> {
|
||||
self.checkpoint.nextOid = next_oid;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
Outcome::Noop // TODO review
|
||||
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|
||||
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
@@ -324,6 +357,7 @@ impl<'a> WalIngest<'a> {
|
||||
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
Outcome::Noop
|
||||
}
|
||||
}
|
||||
pg_constants::RM_LOGICALMSG_ID => {
|
||||
@@ -341,14 +375,29 @@ impl<'a> WalIngest<'a> {
|
||||
crate::failpoint_support::sleep_millis_async!(
|
||||
"wal-ingest-logical-message-sleep"
|
||||
);
|
||||
Outcome::Noop
|
||||
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
|
||||
modification.put_file(path, message, ctx).await?;
|
||||
Outcome::Modified
|
||||
}
|
||||
}
|
||||
}
|
||||
_x => {
|
||||
_x => Outcome::UnknownRecordType,
|
||||
};
|
||||
|
||||
match outcome {
|
||||
Outcome::Noop => {
|
||||
// TODO: https://github.com/neondatabase/neon/issues/5962
|
||||
// => figure out what to do so that we still advance last_record_lsn
|
||||
}
|
||||
Outcome::Modified => {
|
||||
// TODO: https://github.com/neondatabase/neon/issues/5962
|
||||
// => assert that indeed last_record_lsn is this record's LSN
|
||||
}
|
||||
Outcome::UnknownRecordType => {
|
||||
// TODO: should probably log & fail here instead of blindly
|
||||
// doing something without understanding the protocol
|
||||
// doing something without understanding the protocol.
|
||||
// No issue exists for this yet.
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user