diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index befa21af73..27b1d116c0 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -52,6 +52,18 @@ pub struct WalIngest<'a> { checkpoint_modified: bool, } +enum IngestRecordOutcome { + /// The record has been stored in the repository and last_record_lsn has been advanced. + /// This is the common case. + Stored, + /// Pageserver knows this record type, but it is a no-op for Pageserver. + /// Processing of the record didn't have any side-effects, + /// particularly not on the repository or last_record_lsn state. + Noop, + /// Pageserver does not know this record type. + UnknownRecordType, +} + impl<'a> WalIngest<'a> { pub async fn new( timeline: &'a Timeline, @@ -98,12 +110,6 @@ impl<'a> WalIngest<'a> { self.checkpoint_modified = true; } - enum Outcome { - Noop, - Modified, - UnknownRecordType, - } - let outcome = match decoded.xl_rmid { pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => { // Heap AM records need some special handling, because they modify VM pages @@ -114,7 +120,7 @@ impl<'a> WalIngest<'a> { pg_constants::RM_NEON_ID => { self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } // Handle other special record types pg_constants::RM_SMGR_ID => { @@ -124,14 +130,14 @@ impl<'a> WalIngest<'a> { let create = XlSmgrCreate::decode(&mut buf); self.ingest_xlog_smgr_create(modification, &create, ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else if info == pg_constants::XLOG_SMGR_TRUNCATE { let truncate = XlSmgrTruncate::decode(&mut buf); self.ingest_xlog_smgr_truncate(modification, &truncate, ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } pg_constants::RM_DBASE_ID => { @@ -145,7 +151,7 @@ impl<'a> WalIngest<'a> { self.ingest_xlog_dbase_create(modification, &createdb, ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); // HEIKKI: I think 0 tablespaces cannot happen. @@ -154,15 +160,15 @@ impl<'a> WalIngest<'a> { modification .drop_dbdir(tablespace_id, dropdb.db_id, ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } else if self.timeline.pg_version == 15 { if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - Outcome::Noop + IngestRecordOutcome::Noop } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { // The XLOG record was renamed between v14 and v15, // but the record format is the same. @@ -171,26 +177,26 @@ impl<'a> WalIngest<'a> { let createdb = XlCreateDatabase::decode(&mut buf); self.ingest_xlog_dbase_create(modification, &createdb, ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); - let mut outcome = Outcome::Noop; + let mut outcome = IngestRecordOutcome::Noop; // HEIKKI: I think 0 tablespaces cannot happen. for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); modification .drop_dbdir(tablespace_id, dropdb.db_id, ctx) .await?; - outcome = Outcome::Modified; + outcome = IngestRecordOutcome::Stored; } outcome } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } else if self.timeline.pg_version == 16 { if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - Outcome::Noop + IngestRecordOutcome::Noop } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { // The XLOG record was renamed between v14 and v15, // but the record format is the same. @@ -199,26 +205,26 @@ impl<'a> WalIngest<'a> { let createdb = XlCreateDatabase::decode(&mut buf); self.ingest_xlog_dbase_create(modification, &createdb, ctx) .await?; - Outcome::Noop + IngestRecordOutcome::Noop } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); - let mut outcome = Outcome::Noop; + let mut outcome = IngestRecordOutcome::Noop; // HEIKKI: I think 0 tablespaces cannot happen. for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); modification .drop_dbdir(tablespace_id, dropdb.db_id, ctx) .await?; - outcome = Outcome::Modified; + outcome = IngestRecordOutcome::Stored; } } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } } pg_constants::RM_TBLSPC_ID => { trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); - Outcome::Noop + IngestRecordOutcome::Noop } pg_constants::RM_CLOG_ID => { let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; @@ -236,14 +242,14 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; - Outcome::Modified - } else if info == pg_constants::CLOG_TRUNCATE) { + IngestRecordOutcome::Stored + } else if info == pg_constants::CLOG_TRUNCATE { let xlrec = XlClogTruncate::decode(&mut buf); self.ingest_clog_truncate_record(modification, &xlrec, ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } pg_constants::RM_XACT_ID => { @@ -259,7 +265,7 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED || info == pg_constants::XLOG_XACT_ABORT_PREPARED { @@ -282,16 +288,16 @@ impl<'a> WalIngest<'a> { modification .drop_twophase_file(parsed_xact.xid, ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else if info == pg_constants::XLOG_XACT_PREPARE { modification .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else if info == pg_constants::XLOG_XACT_ASSIGNMENT { - Outcome::Noop + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_XACT_INVALIDATIONS { - Outcome::Noop + IngestRecordOutcome::Noop } } pg_constants::RM_MULTIXACT_ID => { @@ -310,7 +316,7 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { let pageno = buf.get_u32_le(); let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; @@ -324,18 +330,18 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); self.ingest_multixact_create_record(modification, &xlrec)?; - Outcome::Modified + IngestRecordOutcome::Stored } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { let xlrec = XlMultiXactTruncate::decode(&mut buf); self.ingest_multixact_truncate_record(modification, &xlrec, ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } pg_constants::RM_RELMAP_ID => { @@ -345,9 +351,9 @@ impl<'a> WalIngest<'a> { let xlrec = XlRelmapUpdate::decode(&mut buf); self.ingest_relmap_page(modification, &xlrec, decoded, ctx) .await?; - Outcome::Modified + IngestRecordOutcome::Stored } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } pg_constants::RM_XLOG_ID => { @@ -359,7 +365,7 @@ impl<'a> WalIngest<'a> { self.checkpoint.nextOid = next_oid; self.checkpoint_modified = true; } - Outcome::Noop // TODO review + IngestRecordOutcome::Noop // TODO review } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN { @@ -380,16 +386,15 @@ impl<'a> WalIngest<'a> { self.checkpoint.oldestXid = xlog_checkpoint.oldestXid; self.checkpoint_modified = true; } - Outcome::Noop - } else if info == pg_constants::XLOG_FPI - || info == pg_constants::XLOG_FPI_FOR_HINT + IngestRecordOutcome::Noop + } else if info == pg_constants::XLOG_FPI || info == pg_constants::XLOG_FPI_FOR_HINT { // These records are importan for us, bu they are handled by // generic ingest_decoded_block() function below. They don't need // any special handling. // // HEIKKI: Is Noop the right code for that case? - Outcome::Noop + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_NOOP || info == pg_constants::XLOG_NEXTOID || info == pg_constants::XLOG_SWITCH @@ -399,13 +404,13 @@ impl<'a> WalIngest<'a> { || info == pg_constants::XLOG_FPW_CHANGE || info == pg_constants::XLOG_END_OF_RECOVERY { - Outcome::Noop + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_OVERWRITE_CONTRECORD { // HEIKKI: I suspect we're not handling these correctly. // See https://github.com/neondatabase/neon/issues/934 // Given that, not sure what the right outcome is. } else { - Outcome::UnexpectedRecordType + IngestRecordOutcome::UnexpectedRecordType } } pg_constants::RM_LOGICALMSG_ID => { @@ -423,48 +428,48 @@ impl<'a> WalIngest<'a> { crate::failpoint_support::sleep_millis_async!( "wal-ingest-logical-message-sleep" ); - Outcome::Noop + IngestRecordOutcome::Noop } else if let Some(path) = prefix.strip_prefix("neon-file:") { modification.put_file(path, message, ctx).await?; - Outcome::Modified + IngestRecordOutcome::Stored } } else { - Outcome::UnexpectedRecordType + IngestRecordOutcome::UnexpectedRecordType } } - pg_constants::RM_STANDBY_ID => Outcome::Noop, + pg_constants::RM_STANDBY_ID => IngestRecordOutcome::Noop, // All of these are handled by the generic ingest_decoded_block function - pg_constants::RM_BTREE_ID => Outcome::Noop, - pg_constants::RM_HASH_ID => Outcome::Noop, - pg_constants::RM_GIN_ID => Outcome::Noop, - pg_constants::RM_GIST_ID => Outcome::Noop, - pg_constants::RM_SEQ_ID => Outcome::Noop, - pg_constants::RM_SPGIST_ID => Outcome::Noop, - pg_constants::RM_BRIN_ID => Outcome::Noop, - pg_constants::RM_GENERIC_ID => Outcome::Noop, + pg_constants::RM_BTREE_ID => IngestRecordOutcome::Noop, + pg_constants::RM_HASH_ID => IngestRecordOutcome::Noop, + pg_constants::RM_GIN_ID => IngestRecordOutcome::Noop, + pg_constants::RM_GIST_ID => IngestRecordOutcome::Noop, + pg_constants::RM_SEQ_ID => IngestRecordOutcome::Noop, + pg_constants::RM_SPGIST_ID => IngestRecordOutcome::Noop, + pg_constants::RM_BRIN_ID => IngestRecordOutcome::Noop, + pg_constants::RM_GENERIC_ID => IngestRecordOutcome::Noop, // We don't support the commit-ts tracking in neon. No harm if we see // these records though. - pg_constants::RM_COMMIT_TS_ID => Outcome::Noop, + pg_constants::RM_COMMIT_TS_ID => IngestRecordOutcome::Noop, // These are related to logical replication. I don't know if we should // do something with them. @knizhnik? - pg_constants::RM_REPLORIGIN_ID => Outcome::Noop, + pg_constants::RM_REPLORIGIN_ID => IngestRecordOutcome::Noop, - _x => Outcome::UnknownRecordType, + _x => IngestRecordOutcome::UnknownRecordType, }; match outcome { - Outcome::Noop => { + IngestRecordOutcome::Noop => { // TODO: https://github.com/neondatabase/neon/issues/5962 // => figure out what to do so that we still advance last_record_lsn } - Outcome::Modified => { + IngestRecordOutcome::Stored => { // TODO: https://github.com/neondatabase/neon/issues/5962 // => assert that indeed last_record_lsn is this record's LSN } - Outcome::UnknownRecordType => { + IngestRecordOutcome::UnknownRecordType => { // TODO: should probably log & fail here instead of blindly // doing something without understanding the protocol. // No issue exists for this yet. @@ -561,7 +566,7 @@ impl<'a> WalIngest<'a> { modification: &mut DatadirModification<'_>, decoded: &DecodedWALRecord, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> anyhow::Result { // Handle VM bit updates that are implicitly part of heap records. // First, look at the record to determine which VM bits need @@ -612,17 +617,17 @@ impl<'a> WalIngest<'a> { } } else if info == pg_constants::XLOG_HEAP_TRUNCATE { // per comment in heap_redo: - // TRUNCATE is a no-op because the actions are already logged as - // SMGR WAL records. TRUNCATE WAL record only exists for logical - // decoding. - Outcome::Noop + // TRUNCATE is a no-op because the actions are already logged as + // SMGR WAL records. TRUNCATE WAL record only exists for logical + // decoding. + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_HEAP_CONFIRM || info == pg_constants::XLOG_HEAP_INPLACE { // these don't update the FSM or VM, so no special handling needed. - Outcome::Noop + IngestRecordOutcome::Noop } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -651,19 +656,19 @@ impl<'a> WalIngest<'a> { || info == pg_constants::XLOG_HEAP2_NEW_CID { // related to logical replication, we can ignore in storage - Outcome::Noop + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_HEAP2_PRUNE || info == pg_constants::XLOG_HEAP2_VACUUM || info == pg_constants::XLOG_HEAP2_FREEZE_PAGE { // these don't update the VM, so no special handling needed. - Outcome::Noop + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_HEAP2_VISIBLE { // This updates the VM, but the VM page is registered as a normal // block in the WAL record, so no special handling is needed. - Outcome::Noop + IngestRecordOutcome::Noop } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } else { bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); @@ -707,20 +712,20 @@ impl<'a> WalIngest<'a> { old_heap_blkno = Some(decoded.blocks[0].blkno); flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; } - } else if info == pg_constants::XLOG_HEAP_TRUNCATE { + } else if info == pg_constants::XLOG_HEAP_TRUNCATE { // per comment in heap_redo: - // TRUNCATE is a no-op because the actions are already logged as - // SMGR WAL records. TRUNCATE WAL record only exists for logical - // decoding. - Outcome::Noop + // TRUNCATE is a no-op because the actions are already logged as + // SMGR WAL records. TRUNCATE WAL record only exists for logical + // decoding. + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_HEAP_CONFIRM || info == pg_constants::XLOG_HEAP_INPLACE { // these don't update the FSM or VM, so no special handling needed. - Outcome::Noop + IngestRecordOutcome::Noop } else { - Outcome::UnknownRecordType - } + IngestRecordOutcome::UnknownRecordType + } } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { @@ -748,19 +753,19 @@ impl<'a> WalIngest<'a> { || info == pg_constants::XLOG_HEAP2_NEW_CID { // related to logical replication, we can ignore in storage - Outcome::Noop + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_HEAP2_PRUNE || info == pg_constants::XLOG_HEAP2_VACUUM || info == pg_constants::XLOG_HEAP2_FREEZE_PAGE { // these don't update the VM, so no special handling needed. - Outcome::Noop + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_HEAP2_VISIBLE { // This updates the VM, but the VM page is registered as a normal // block in the WAL record, so no special handling is needed. - Outcome::Noop + IngestRecordOutcome::Noop } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } else { bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); @@ -806,17 +811,17 @@ impl<'a> WalIngest<'a> { } } else if info == pg_constants::XLOG_HEAP_TRUNCATE { // per comment in heap_redo: - // TRUNCATE is a no-op because the actions are already logged as - // SMGR WAL records. TRUNCATE WAL record only exists for logical - // decoding. - Outcome::Noop + // TRUNCATE is a no-op because the actions are already logged as + // SMGR WAL records. TRUNCATE WAL record only exists for logical + // decoding. + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_HEAP_CONFIRM || info == pg_constants::XLOG_HEAP_INPLACE { // these don't update the FSM or VM, so no special handling needed. - Outcome::Noop + IngestRecordOutcome::Noop } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -845,19 +850,19 @@ impl<'a> WalIngest<'a> { || info == pg_constants::XLOG_HEAP2_NEW_CID { // related to logical replication, we can ignore in storage - Outcome::Noop + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_HEAP2_PRUNE || info == pg_constants::XLOG_HEAP2_VACUUM || info == pg_constants::XLOG_HEAP2_FREEZE_PAGE { // these don't update the VM, so no special handling needed. - Outcome::Noop + IngestRecordOutcome::Noop } else if info == pg_constants::XLOG_HEAP2_VISIBLE { // This updates the VM, but the VM page is registered as a normal // block in the WAL record, so no special handling is needed. - Outcome::Noop + IngestRecordOutcome::Noop } else { - Outcome::UnknownRecordType + IngestRecordOutcome::UnknownRecordType } } else { bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);