From a6fb4f26873e62e12a7492a5d4cf185f3eb81b5a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 30 Nov 2023 16:34:09 +0000 Subject: [PATCH] walingest: prepare for exhaustive match and handling of no-ops Use rust type system to force the "special treatment" block to declare the expected outcome of special treatment. The TODO comments will be addressed in future commits, for now let's just encode our expectations. This commit doesn't compile yet because a bunch of `else { ... }` blocks are missing. I'll ask Postgres-savvy colleagues to fill in here. --- pageserver/src/walingest.rs | 55 +++++++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 4189200d5c..8b11329fb9 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -98,16 +98,24 @@ impl<'a> WalIngest<'a> { self.checkpoint_modified = true; } - match decoded.xl_rmid { + enum Outcome { + Noop, + Modified, + UnknownRecordType, + } + + let outcome = match decoded.xl_rmid { pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => { // Heap AM records need some special handling, because they modify VM pages // without registering them with the standard mechanism. self.ingest_heapam_record(&mut buf, modification, decoded, ctx) .await?; + Outcome::Modified } pg_constants::RM_NEON_ID => { self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx) .await?; + Outcome::Modified } // Handle other special record types pg_constants::RM_SMGR_ID => { @@ -117,10 +125,12 @@ impl<'a> WalIngest<'a> { let create = XlSmgrCreate::decode(&mut buf); self.ingest_xlog_smgr_create(modification, &create, ctx) .await?; + Outcome::Modified } else if info == pg_constants::XLOG_SMGR_TRUNCATE { let truncate = XlSmgrTruncate::decode(&mut buf); self.ingest_xlog_smgr_truncate(modification, &truncate, ctx) .await?; + Outcome::Modified } } pg_constants::RM_DBASE_ID => { @@ -134,6 +144,7 @@ impl<'a> WalIngest<'a> { self.ingest_xlog_dbase_create(modification, &createdb, ctx) .await?; + Outcome::Modified } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); for tablespace_id in dropdb.tablespace_ids { @@ -141,11 +152,13 @@ impl<'a> WalIngest<'a> { modification .drop_dbdir(tablespace_id, dropdb.db_id, ctx) .await?; + Outcome::Modified } } } else if self.timeline.pg_version == 15 { if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + Outcome::Noop } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { // The XLOG record was renamed between v14 and v15, // but the record format is the same. @@ -154,18 +167,23 @@ impl<'a> WalIngest<'a> { let createdb = XlCreateDatabase::decode(&mut buf); self.ingest_xlog_dbase_create(modification, &createdb, ctx) .await?; + Outcome::Modified } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); + let mut outcome = Outcome::Noop; for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); modification .drop_dbdir(tablespace_id, dropdb.db_id, ctx) .await?; + outcome = Outcome::Modified; } + outcome } } else if self.timeline.pg_version == 16 { if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + Outcome::Noop } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { // The XLOG record was renamed between v14 and v15, // but the record format is the same. @@ -174,19 +192,23 @@ impl<'a> WalIngest<'a> { let createdb = XlCreateDatabase::decode(&mut buf); self.ingest_xlog_dbase_create(modification, &createdb, ctx) .await?; + Outcome::Noop } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); + let mut outcome = Outcome::Noop; for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); modification .drop_dbdir(tablespace_id, dropdb.db_id, ctx) .await?; + outcome = Outcome::Modified; } } } } pg_constants::RM_TBLSPC_ID => { trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); + Outcome::Noop } pg_constants::RM_CLOG_ID => { let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; @@ -204,11 +226,13 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; + Outcome::Modified } else { assert!(info == pg_constants::CLOG_TRUNCATE); let xlrec = XlClogTruncate::decode(&mut buf); self.ingest_clog_truncate_record(modification, &xlrec, ctx) .await?; + Outcome::Modified } } pg_constants::RM_XACT_ID => { @@ -224,6 +248,7 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; + Outcome::Modified } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED || info == pg_constants::XLOG_XACT_ABORT_PREPARED { @@ -246,10 +271,12 @@ impl<'a> WalIngest<'a> { modification .drop_twophase_file(parsed_xact.xid, ctx) .await?; + Outcome::Modified } else if info == pg_constants::XLOG_XACT_PREPARE { modification .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx) .await?; + Outcome::Modified } } pg_constants::RM_MULTIXACT_ID => { @@ -268,6 +295,7 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; + Outcome::Modified } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { let pageno = buf.get_u32_le(); let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; @@ -281,19 +309,23 @@ impl<'a> WalIngest<'a> { ctx, ) .await?; + Outcome::Modified } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); self.ingest_multixact_create_record(modification, &xlrec)?; + Outcome::Modified } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { let xlrec = XlMultiXactTruncate::decode(&mut buf); self.ingest_multixact_truncate_record(modification, &xlrec, ctx) .await?; + Outcome::Modified } } pg_constants::RM_RELMAP_ID => { let xlrec = XlRelmapUpdate::decode(&mut buf); self.ingest_relmap_page(modification, &xlrec, decoded, ctx) .await?; + Outcome::Modified } pg_constants::RM_XLOG_ID => { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; @@ -304,6 +336,7 @@ impl<'a> WalIngest<'a> { self.checkpoint.nextOid = next_oid; self.checkpoint_modified = true; } + Outcome::Noop // TODO review } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN { @@ -324,6 +357,7 @@ impl<'a> WalIngest<'a> { self.checkpoint.oldestXid = xlog_checkpoint.oldestXid; self.checkpoint_modified = true; } + Outcome::Noop } } pg_constants::RM_LOGICALMSG_ID => { @@ -341,14 +375,29 @@ impl<'a> WalIngest<'a> { crate::failpoint_support::sleep_millis_async!( "wal-ingest-logical-message-sleep" ); + Outcome::Noop } else if let Some(path) = prefix.strip_prefix("neon-file:") { modification.put_file(path, message, ctx).await?; + Outcome::Modified } } } - _x => { + _x => Outcome::UnknownRecordType, + }; + + match outcome { + Outcome::Noop => { + // TODO: https://github.com/neondatabase/neon/issues/5962 + // => figure out what to do so that we still advance last_record_lsn + } + Outcome::Modified => { + // TODO: https://github.com/neondatabase/neon/issues/5962 + // => assert that indeed last_record_lsn is this record's LSN + } + Outcome::UnknownRecordType => { // TODO: should probably log & fail here instead of blindly - // doing something without understanding the protocol + // doing something without understanding the protocol. + // No issue exists for this yet. } }