diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 4189200d5c..8b11329fb9 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -98,16 +98,24 @@ impl<'a> WalIngest<'a> { self.checkpoint_modified = true; } - match decoded.xl_rmid { + enum Outcome { + Noop, + Modified, + UnknownRecordType, + } + + let outcome = match decoded.xl_rmid { pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => { // Heap AM records need some special handling, because they modify VM pages // without registering them with the standard mechanism. self.ingest_heapam_record(&mut buf, modification, decoded, ctx) .await?; + Outcome::Modified } pg_constants::RM_NEON_ID => { self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx) .await?; + Outcome::Modified } // Handle other special record types pg_constants::RM_SMGR_ID => { @@ -117,10 +125,12 @@ impl<'a> WalIngest<'a> { let create = XlSmgrCreate::decode(&mut buf); self.ingest_xlog_smgr_create(modification, &create, ctx) .await?; + Outcome::Modified } else if info == pg_constants::XLOG_SMGR_TRUNCATE { let truncate = XlSmgrTruncate::decode(&mut buf); self.ingest_xlog_smgr_truncate(modification, &truncate, ctx) .await?; + Outcome::Modified } } pg_constants::RM_DBASE_ID => { @@ -134,6 +144,7 @@ impl<'a> WalIngest<'a> { self.ingest_xlog_dbase_create(modification, &createdb, ctx) .await?; + Outcome::Modified } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); for tablespace_id in dropdb.tablespace_ids { @@ -141,11 +152,13 @@ impl<'a> WalIngest<'a> { modification .drop_dbdir(tablespace_id, dropdb.db_id, ctx) .await?; + Outcome::Modified } } } else if self.timeline.pg_version == 15 { if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + Outcome::Noop } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { // The XLOG record was renamed between v14 and v15, // but the record format is the same. @@ -154,18 +167,23 @@ impl<'a> WalIngest<'a> { let createdb = XlCreateDatabase::decode(&mut buf); self.ingest_xlog_dbase_create(modification, &createdb, ctx) .await?; + Outcome::Modified } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); + let mut outcome = Outcome::Noop; for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); modification .drop_dbdir(tablespace_id, dropdb.db_id, ctx) .await?; + outcome = Outcome::Modified; } + outcome } } else if self.timeline.pg_version == 16 { if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + Outcome::Noop } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { // The XLOG record was renamed between v14 and v15, // but the record format is the same. @@ -174,19 +192,23 @@ impl<'a> WalIngest<'a> { let createdb = XlCreateDatabase::decode(&mut buf); self.ingest_xlog_dbase_create(modification, &createdb, ctx) .await?; + Outcome::Noop } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); + let mut outcome = Outcome::Noop; for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); modification .drop_dbdir(tablespace_id, dropdb.db_id, ctx) .await?; + outcome = Outcome::Modified; } } } } pg_constants::RM_TBLSPC_ID => { trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); + Outcome::Noop } pg_constants::RM_CLOG_ID => { let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; @@ -204,11 +226,13 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; + Outcome::Modified } else { assert!(info == pg_constants::CLOG_TRUNCATE); let xlrec = XlClogTruncate::decode(&mut buf); self.ingest_clog_truncate_record(modification, &xlrec, ctx) .await?; + Outcome::Modified } } pg_constants::RM_XACT_ID => { @@ -224,6 +248,7 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; + Outcome::Modified } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED || info == pg_constants::XLOG_XACT_ABORT_PREPARED { @@ -246,10 +271,12 @@ impl<'a> WalIngest<'a> { modification .drop_twophase_file(parsed_xact.xid, ctx) .await?; + Outcome::Modified } else if info == pg_constants::XLOG_XACT_PREPARE { modification .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx) .await?; + Outcome::Modified } } pg_constants::RM_MULTIXACT_ID => { @@ -268,6 +295,7 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; + Outcome::Modified } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { let pageno = buf.get_u32_le(); let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; @@ -281,19 +309,23 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; + Outcome::Modified } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); self.ingest_multixact_create_record(modification, &xlrec)?; + Outcome::Modified } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { let xlrec = XlMultiXactTruncate::decode(&mut buf); self.ingest_multixact_truncate_record(modification, &xlrec, ctx) .await?; + Outcome::Modified } } pg_constants::RM_RELMAP_ID => { let xlrec = XlRelmapUpdate::decode(&mut buf); self.ingest_relmap_page(modification, &xlrec, decoded, ctx) .await?; + Outcome::Modified } pg_constants::RM_XLOG_ID => { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; @@ -304,6 +336,7 @@ impl<'a> WalIngest<'a> { self.checkpoint.nextOid = next_oid; self.checkpoint_modified = true; } + Outcome::Noop // TODO review } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN { @@ -324,6 +357,7 @@ impl<'a> WalIngest<'a> { self.checkpoint.oldestXid = xlog_checkpoint.oldestXid; self.checkpoint_modified = true; } + Outcome::Noop } } pg_constants::RM_LOGICALMSG_ID => { @@ -341,14 +375,29 @@ impl<'a> WalIngest<'a> { crate::failpoint_support::sleep_millis_async!( "wal-ingest-logical-message-sleep" ); + Outcome::Noop } else if let Some(path) = prefix.strip_prefix("neon-file:") { modification.put_file(path, message, ctx).await?; + Outcome::Modified } } } - _x => { + _x => Outcome::UnknownRecordType, + }; + + match outcome { + Outcome::Noop => { + // TODO: https://github.com/neondatabase/neon/issues/5962 + // => figure out what to do so that we still advance last_record_lsn + } + Outcome::Modified => { + // TODO: https://github.com/neondatabase/neon/issues/5962 + // => assert that indeed last_record_lsn is this record's LSN + } + Outcome::UnknownRecordType => { // TODO: should probably log & fail here instead of blindly - // doing something without understanding the protocol + // doing something without understanding the protocol. + // No issue exists for this yet. } }