rename types, fix stray ')', rustfmt

This commit is contained in:
Christian Schwarz
2024-01-03 17:46:08 +00:00
parent 7583e0a696
commit 5a4765f1f2

View File

@@ -52,6 +52,18 @@ pub struct WalIngest<'a> {
checkpoint_modified: bool,
}
enum IngestRecordOutcome {
/// The record has been stored in the repository and last_record_lsn has been advanced.
/// This is the common case.
Stored,
/// Pageserver knows this record type, but it is a no-op for Pageserver.
/// Processing of the record didn't have any side-effects,
/// particularly not on the repository or last_record_lsn state.
Noop,
/// Pageserver does not know this record type.
UnknownRecordType,
}
impl<'a> WalIngest<'a> {
pub async fn new(
timeline: &'a Timeline,
@@ -98,12 +110,6 @@ impl<'a> WalIngest<'a> {
self.checkpoint_modified = true;
}
enum Outcome {
Noop,
Modified,
UnknownRecordType,
}
let outcome = match decoded.xl_rmid {
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
// Heap AM records need some special handling, because they modify VM pages
@@ -114,7 +120,7 @@ impl<'a> WalIngest<'a> {
pg_constants::RM_NEON_ID => {
self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
}
// Handle other special record types
pg_constants::RM_SMGR_ID => {
@@ -124,14 +130,14 @@ impl<'a> WalIngest<'a> {
let create = XlSmgrCreate::decode(&mut buf);
self.ingest_xlog_smgr_create(modification, &create, ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else if info == pg_constants::XLOG_SMGR_TRUNCATE {
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
}
pg_constants::RM_DBASE_ID => {
@@ -145,7 +151,7 @@ impl<'a> WalIngest<'a> {
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(&mut buf);
// HEIKKI: I think 0 tablespaces cannot happen.
@@ -154,15 +160,15 @@ impl<'a> WalIngest<'a> {
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
}
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
} else if self.timeline.pg_version == 15 {
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
@@ -171,26 +177,26 @@ impl<'a> WalIngest<'a> {
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(&mut buf);
let mut outcome = Outcome::Noop;
let mut outcome = IngestRecordOutcome::Noop;
// HEIKKI: I think 0 tablespaces cannot happen.
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
outcome = Outcome::Modified;
outcome = IngestRecordOutcome::Stored;
}
outcome
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
} else if self.timeline.pg_version == 16 {
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
@@ -199,26 +205,26 @@ impl<'a> WalIngest<'a> {
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(&mut buf);
let mut outcome = Outcome::Noop;
let mut outcome = IngestRecordOutcome::Noop;
// HEIKKI: I think 0 tablespaces cannot happen.
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
outcome = Outcome::Modified;
outcome = IngestRecordOutcome::Stored;
}
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
}
}
pg_constants::RM_TBLSPC_ID => {
trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
Outcome::Noop
IngestRecordOutcome::Noop
}
pg_constants::RM_CLOG_ID => {
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
@@ -236,14 +242,14 @@ impl<'a> WalIngest<'a> {
ctx,
)
.await?;
Outcome::Modified
} else if info == pg_constants::CLOG_TRUNCATE) {
IngestRecordOutcome::Stored
} else if info == pg_constants::CLOG_TRUNCATE {
let xlrec = XlClogTruncate::decode(&mut buf);
self.ingest_clog_truncate_record(modification, &xlrec, ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
}
pg_constants::RM_XACT_ID => {
@@ -259,7 +265,7 @@ impl<'a> WalIngest<'a> {
ctx,
)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
|| info == pg_constants::XLOG_XACT_ABORT_PREPARED
{
@@ -282,16 +288,16 @@ impl<'a> WalIngest<'a> {
modification
.drop_twophase_file(parsed_xact.xid, ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else if info == pg_constants::XLOG_XACT_PREPARE {
modification
.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else if info == pg_constants::XLOG_XACT_ASSIGNMENT {
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_XACT_INVALIDATIONS {
Outcome::Noop
IngestRecordOutcome::Noop
}
}
pg_constants::RM_MULTIXACT_ID => {
@@ -310,7 +316,7 @@ impl<'a> WalIngest<'a> {
ctx,
)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
@@ -324,18 +330,18 @@ impl<'a> WalIngest<'a> {
ctx,
)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
self.ingest_multixact_create_record(modification, &xlrec)?;
Outcome::Modified
IngestRecordOutcome::Stored
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
}
pg_constants::RM_RELMAP_ID => {
@@ -345,9 +351,9 @@ impl<'a> WalIngest<'a> {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
.await?;
Outcome::Modified
IngestRecordOutcome::Stored
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
}
pg_constants::RM_XLOG_ID => {
@@ -359,7 +365,7 @@ impl<'a> WalIngest<'a> {
self.checkpoint.nextOid = next_oid;
self.checkpoint_modified = true;
}
Outcome::Noop // TODO review
IngestRecordOutcome::Noop // TODO review
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
@@ -380,16 +386,15 @@ impl<'a> WalIngest<'a> {
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
self.checkpoint_modified = true;
}
Outcome::Noop
} else if info == pg_constants::XLOG_FPI
|| info == pg_constants::XLOG_FPI_FOR_HINT
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_FPI || info == pg_constants::XLOG_FPI_FOR_HINT
{
// These records are importan for us, bu they are handled by
// generic ingest_decoded_block() function below. They don't need
// any special handling.
//
// HEIKKI: Is Noop the right code for that case?
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_NOOP
|| info == pg_constants::XLOG_NEXTOID
|| info == pg_constants::XLOG_SWITCH
@@ -399,13 +404,13 @@ impl<'a> WalIngest<'a> {
|| info == pg_constants::XLOG_FPW_CHANGE
|| info == pg_constants::XLOG_END_OF_RECOVERY
{
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_OVERWRITE_CONTRECORD {
// HEIKKI: I suspect we're not handling these correctly.
// See https://github.com/neondatabase/neon/issues/934
// Given that, not sure what the right outcome is.
} else {
Outcome::UnexpectedRecordType
IngestRecordOutcome::UnexpectedRecordType
}
}
pg_constants::RM_LOGICALMSG_ID => {
@@ -423,48 +428,48 @@ impl<'a> WalIngest<'a> {
crate::failpoint_support::sleep_millis_async!(
"wal-ingest-logical-message-sleep"
);
Outcome::Noop
IngestRecordOutcome::Noop
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
modification.put_file(path, message, ctx).await?;
Outcome::Modified
IngestRecordOutcome::Stored
}
} else {
Outcome::UnexpectedRecordType
IngestRecordOutcome::UnexpectedRecordType
}
}
pg_constants::RM_STANDBY_ID => Outcome::Noop,
pg_constants::RM_STANDBY_ID => IngestRecordOutcome::Noop,
// All of these are handled by the generic ingest_decoded_block function
pg_constants::RM_BTREE_ID => Outcome::Noop,
pg_constants::RM_HASH_ID => Outcome::Noop,
pg_constants::RM_GIN_ID => Outcome::Noop,
pg_constants::RM_GIST_ID => Outcome::Noop,
pg_constants::RM_SEQ_ID => Outcome::Noop,
pg_constants::RM_SPGIST_ID => Outcome::Noop,
pg_constants::RM_BRIN_ID => Outcome::Noop,
pg_constants::RM_GENERIC_ID => Outcome::Noop,
pg_constants::RM_BTREE_ID => IngestRecordOutcome::Noop,
pg_constants::RM_HASH_ID => IngestRecordOutcome::Noop,
pg_constants::RM_GIN_ID => IngestRecordOutcome::Noop,
pg_constants::RM_GIST_ID => IngestRecordOutcome::Noop,
pg_constants::RM_SEQ_ID => IngestRecordOutcome::Noop,
pg_constants::RM_SPGIST_ID => IngestRecordOutcome::Noop,
pg_constants::RM_BRIN_ID => IngestRecordOutcome::Noop,
pg_constants::RM_GENERIC_ID => IngestRecordOutcome::Noop,
// We don't support the commit-ts tracking in neon. No harm if we see
// these records though.
pg_constants::RM_COMMIT_TS_ID => Outcome::Noop,
pg_constants::RM_COMMIT_TS_ID => IngestRecordOutcome::Noop,
// These are related to logical replication. I don't know if we should
// do something with them. @knizhnik?
pg_constants::RM_REPLORIGIN_ID => Outcome::Noop,
pg_constants::RM_REPLORIGIN_ID => IngestRecordOutcome::Noop,
_x => Outcome::UnknownRecordType,
_x => IngestRecordOutcome::UnknownRecordType,
};
match outcome {
Outcome::Noop => {
IngestRecordOutcome::Noop => {
// TODO: https://github.com/neondatabase/neon/issues/5962
// => figure out what to do so that we still advance last_record_lsn
}
Outcome::Modified => {
IngestRecordOutcome::Stored => {
// TODO: https://github.com/neondatabase/neon/issues/5962
// => assert that indeed last_record_lsn is this record's LSN
}
Outcome::UnknownRecordType => {
IngestRecordOutcome::UnknownRecordType => {
// TODO: should probably log & fail here instead of blindly
// doing something without understanding the protocol.
// No issue exists for this yet.
@@ -561,7 +566,7 @@ impl<'a> WalIngest<'a> {
modification: &mut DatadirModification<'_>,
decoded: &DecodedWALRecord,
ctx: &RequestContext,
) -> anyhow::Result<Outcome> {
) -> anyhow::Result<IngestRecordOutcome> {
// Handle VM bit updates that are implicitly part of heap records.
// First, look at the record to determine which VM bits need
@@ -612,17 +617,17 @@ impl<'a> WalIngest<'a> {
}
} else if info == pg_constants::XLOG_HEAP_TRUNCATE {
// per comment in heap_redo:
// TRUNCATE is a no-op because the actions are already logged as
// SMGR WAL records. TRUNCATE WAL record only exists for logical
// decoding.
Outcome::Noop
// TRUNCATE is a no-op because the actions are already logged as
// SMGR WAL records. TRUNCATE WAL record only exists for logical
// decoding.
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_HEAP_CONFIRM
|| info == pg_constants::XLOG_HEAP_INPLACE
{
// these don't update the FSM or VM, so no special handling needed.
Outcome::Noop
IngestRecordOutcome::Noop
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -651,19 +656,19 @@ impl<'a> WalIngest<'a> {
|| info == pg_constants::XLOG_HEAP2_NEW_CID
{
// related to logical replication, we can ignore in storage
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_HEAP2_PRUNE
|| info == pg_constants::XLOG_HEAP2_VACUUM
|| info == pg_constants::XLOG_HEAP2_FREEZE_PAGE
{
// these don't update the VM, so no special handling needed.
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_HEAP2_VISIBLE {
// This updates the VM, but the VM page is registered as a normal
// block in the WAL record, so no special handling is needed.
Outcome::Noop
IngestRecordOutcome::Noop
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -707,20 +712,20 @@ impl<'a> WalIngest<'a> {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
} else if info == pg_constants::XLOG_HEAP_TRUNCATE {
} else if info == pg_constants::XLOG_HEAP_TRUNCATE {
// per comment in heap_redo:
// TRUNCATE is a no-op because the actions are already logged as
// SMGR WAL records. TRUNCATE WAL record only exists for logical
// decoding.
Outcome::Noop
// TRUNCATE is a no-op because the actions are already logged as
// SMGR WAL records. TRUNCATE WAL record only exists for logical
// decoding.
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_HEAP_CONFIRM
|| info == pg_constants::XLOG_HEAP_INPLACE
{
// these don't update the FSM or VM, so no special handling needed.
Outcome::Noop
IngestRecordOutcome::Noop
} else {
Outcome::UnknownRecordType
}
IngestRecordOutcome::UnknownRecordType
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
@@ -748,19 +753,19 @@ impl<'a> WalIngest<'a> {
|| info == pg_constants::XLOG_HEAP2_NEW_CID
{
// related to logical replication, we can ignore in storage
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_HEAP2_PRUNE
|| info == pg_constants::XLOG_HEAP2_VACUUM
|| info == pg_constants::XLOG_HEAP2_FREEZE_PAGE
{
// these don't update the VM, so no special handling needed.
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_HEAP2_VISIBLE {
// This updates the VM, but the VM page is registered as a normal
// block in the WAL record, so no special handling is needed.
Outcome::Noop
IngestRecordOutcome::Noop
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -806,17 +811,17 @@ impl<'a> WalIngest<'a> {
}
} else if info == pg_constants::XLOG_HEAP_TRUNCATE {
// per comment in heap_redo:
// TRUNCATE is a no-op because the actions are already logged as
// SMGR WAL records. TRUNCATE WAL record only exists for logical
// decoding.
Outcome::Noop
// TRUNCATE is a no-op because the actions are already logged as
// SMGR WAL records. TRUNCATE WAL record only exists for logical
// decoding.
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_HEAP_CONFIRM
|| info == pg_constants::XLOG_HEAP_INPLACE
{
// these don't update the FSM or VM, so no special handling needed.
Outcome::Noop
IngestRecordOutcome::Noop
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -845,19 +850,19 @@ impl<'a> WalIngest<'a> {
|| info == pg_constants::XLOG_HEAP2_NEW_CID
{
// related to logical replication, we can ignore in storage
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_HEAP2_PRUNE
|| info == pg_constants::XLOG_HEAP2_VACUUM
|| info == pg_constants::XLOG_HEAP2_FREEZE_PAGE
{
// these don't update the VM, so no special handling needed.
Outcome::Noop
IngestRecordOutcome::Noop
} else if info == pg_constants::XLOG_HEAP2_VISIBLE {
// This updates the VM, but the VM page is registered as a normal
// block in the WAL record, so no special handling is needed.
Outcome::Noop
IngestRecordOutcome::Noop
} else {
Outcome::UnknownRecordType
IngestRecordOutcome::UnknownRecordType
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);