diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 8b11329fb9..befa21af73 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -109,8 +109,7 @@ impl<'a> WalIngest<'a> { // Heap AM records need some special handling, because they modify VM pages // without registering them with the standard mechanism. self.ingest_heapam_record(&mut buf, modification, decoded, ctx) - .await?; - Outcome::Modified + .await? } pg_constants::RM_NEON_ID => { self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx) @@ -131,6 +130,8 @@ impl<'a> WalIngest<'a> { self.ingest_xlog_smgr_truncate(modification, &truncate, ctx) .await?; Outcome::Modified + } else { + Outcome::UnknownRecordType } } pg_constants::RM_DBASE_ID => { @@ -147,6 +148,7 @@ impl<'a> WalIngest<'a> { Outcome::Modified } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); + // HEIKKI: I think 0 tablespaces cannot happen. for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); modification @@ -154,6 +156,8 @@ impl<'a> WalIngest<'a> { .await?; Outcome::Modified } + } else { + Outcome::UnknownRecordType } } else if self.timeline.pg_version == 15 { if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { @@ -171,6 +175,7 @@ impl<'a> WalIngest<'a> { } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); let mut outcome = Outcome::Noop; + // HEIKKI: I think 0 tablespaces cannot happen. for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); modification @@ -179,6 +184,8 @@ impl<'a> WalIngest<'a> { outcome = Outcome::Modified; } outcome + } else { + Outcome::UnknownRecordType } } else if self.timeline.pg_version == 16 { if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { @@ -196,6 +203,7 @@ impl<'a> WalIngest<'a> { } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { let dropdb = XlDropDatabase::decode(&mut buf); let mut outcome = Outcome::Noop; + // HEIKKI: I think 0 tablespaces cannot happen. for tablespace_id in dropdb.tablespace_ids { trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); modification @@ -203,6 +211,8 @@ impl<'a> WalIngest<'a> { .await?; outcome = Outcome::Modified; } + } else { + Outcome::UnknownRecordType } } } @@ -227,12 +237,13 @@ impl<'a> WalIngest<'a> { ) .await?; Outcome::Modified - } else { - assert!(info == pg_constants::CLOG_TRUNCATE); + } else if info == pg_constants::CLOG_TRUNCATE) { let xlrec = XlClogTruncate::decode(&mut buf); self.ingest_clog_truncate_record(modification, &xlrec, ctx) .await?; Outcome::Modified + } else { + Outcome::UnknownRecordType } } pg_constants::RM_XACT_ID => { @@ -277,6 +288,10 @@ impl<'a> WalIngest<'a> { .put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx) .await?; Outcome::Modified + } else if info == pg_constants::XLOG_XACT_ASSIGNMENT { + Outcome::Noop + } else if info == pg_constants::XLOG_XACT_INVALIDATIONS { + Outcome::Noop } } pg_constants::RM_MULTIXACT_ID => { @@ -319,13 +334,21 @@ impl<'a> WalIngest<'a> { self.ingest_multixact_truncate_record(modification, &xlrec, ctx) .await?; Outcome::Modified + } else { + Outcome::UnknownRecordType } } pg_constants::RM_RELMAP_ID => { - let xlrec = XlRelmapUpdate::decode(&mut buf); - self.ingest_relmap_page(modification, &xlrec, decoded, ctx) - .await?; - Outcome::Modified + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + + if info == pg_constants::XLOG_RELMAP_UPDATE { + let xlrec = XlRelmapUpdate::decode(&mut buf); + self.ingest_relmap_page(modification, &xlrec, decoded, ctx) + .await?; + Outcome::Modified + } else { + Outcome::UnknownRecordType + } } pg_constants::RM_XLOG_ID => { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; @@ -358,6 +381,31 @@ impl<'a> WalIngest<'a> { self.checkpoint_modified = true; } Outcome::Noop + } else if info == pg_constants::XLOG_FPI + || info == pg_constants::XLOG_FPI_FOR_HINT + { + // These records are importan for us, bu they are handled by + // generic ingest_decoded_block() function below. They don't need + // any special handling. + // + // HEIKKI: Is Noop the right code for that case? + Outcome::Noop + } else if info == pg_constants::XLOG_NOOP + || info == pg_constants::XLOG_NEXTOID + || info == pg_constants::XLOG_SWITCH + || info == pg_constants::XLOG_BACKUP_END + || info == pg_constants::XLOG_PARAMETER_CHANGE + || info == pg_constants::XLOG_RESTORE_POINT + || info == pg_constants::XLOG_FPW_CHANGE + || info == pg_constants::XLOG_END_OF_RECOVERY + { + Outcome::Noop + } else if info == pg_constants::XLOG_OVERWRITE_CONTRECORD { + // HEIKKI: I suspect we're not handling these correctly. + // See https://github.com/neondatabase/neon/issues/934 + // Given that, not sure what the right outcome is. + } else { + Outcome::UnexpectedRecordType } } pg_constants::RM_LOGICALMSG_ID => { @@ -380,8 +428,30 @@ impl<'a> WalIngest<'a> { modification.put_file(path, message, ctx).await?; Outcome::Modified } + } else { + Outcome::UnexpectedRecordType } } + pg_constants::RM_STANDBY_ID => Outcome::Noop, + + // All of these are handled by the generic ingest_decoded_block function + pg_constants::RM_BTREE_ID => Outcome::Noop, + pg_constants::RM_HASH_ID => Outcome::Noop, + pg_constants::RM_GIN_ID => Outcome::Noop, + pg_constants::RM_GIST_ID => Outcome::Noop, + pg_constants::RM_SEQ_ID => Outcome::Noop, + pg_constants::RM_SPGIST_ID => Outcome::Noop, + pg_constants::RM_BRIN_ID => Outcome::Noop, + pg_constants::RM_GENERIC_ID => Outcome::Noop, + + // We don't support the commit-ts tracking in neon. No harm if we see + // these records though. + pg_constants::RM_COMMIT_TS_ID => Outcome::Noop, + + // These are related to logical replication. I don't know if we should + // do something with them. @knizhnik? + pg_constants::RM_REPLORIGIN_ID => Outcome::Noop, + _x => Outcome::UnknownRecordType, }; @@ -491,7 +561,7 @@ impl<'a> WalIngest<'a> { modification: &mut DatadirModification<'_>, decoded: &DecodedWALRecord, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { // Handle VM bit updates that are implicitly part of heap records. // First, look at the record to determine which VM bits need @@ -540,6 +610,19 @@ impl<'a> WalIngest<'a> { old_heap_blkno = Some(decoded.blocks[0].blkno); flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; } + } else if info == pg_constants::XLOG_HEAP_TRUNCATE { + // per comment in heap_redo: + // TRUNCATE is a no-op because the actions are already logged as + // SMGR WAL records. TRUNCATE WAL record only exists for logical + // decoding. + Outcome::Noop + } else if info == pg_constants::XLOG_HEAP_CONFIRM + || info == pg_constants::XLOG_HEAP_INPLACE + { + // these don't update the FSM or VM, so no special handling needed. + Outcome::Noop + } else { + Outcome::UnknownRecordType } } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -564,6 +647,23 @@ impl<'a> WalIngest<'a> { old_heap_blkno = Some(decoded.blocks[0].blkno); flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; } + } else if info == pg_constants::XLOG_HEAP2_REWRITE + || info == pg_constants::XLOG_HEAP2_NEW_CID + { + // related to logical replication, we can ignore in storage + Outcome::Noop + } else if info == pg_constants::XLOG_HEAP2_PRUNE + || info == pg_constants::XLOG_HEAP2_VACUUM + || info == pg_constants::XLOG_HEAP2_FREEZE_PAGE + { + // these don't update the VM, so no special handling needed. + Outcome::Noop + } else if info == pg_constants::XLOG_HEAP2_VISIBLE { + // This updates the VM, but the VM page is registered as a normal + // block in the WAL record, so no special handling is needed. + Outcome::Noop + } else { + Outcome::UnknownRecordType } } else { bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); @@ -607,7 +707,20 @@ impl<'a> WalIngest<'a> { old_heap_blkno = Some(decoded.blocks[0].blkno); flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; } - } + } else if info == pg_constants::XLOG_HEAP_TRUNCATE { + // per comment in heap_redo: + // TRUNCATE is a no-op because the actions are already logged as + // SMGR WAL records. TRUNCATE WAL record only exists for logical + // decoding. + Outcome::Noop + } else if info == pg_constants::XLOG_HEAP_CONFIRM + || info == pg_constants::XLOG_HEAP_INPLACE + { + // these don't update the FSM or VM, so no special handling needed. + Outcome::Noop + } else { + Outcome::UnknownRecordType + } } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { @@ -631,6 +744,23 @@ impl<'a> WalIngest<'a> { old_heap_blkno = Some(decoded.blocks[0].blkno); flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; } + } else if info == pg_constants::XLOG_HEAP2_REWRITE + || info == pg_constants::XLOG_HEAP2_NEW_CID + { + // related to logical replication, we can ignore in storage + Outcome::Noop + } else if info == pg_constants::XLOG_HEAP2_PRUNE + || info == pg_constants::XLOG_HEAP2_VACUUM + || info == pg_constants::XLOG_HEAP2_FREEZE_PAGE + { + // these don't update the VM, so no special handling needed. + Outcome::Noop + } else if info == pg_constants::XLOG_HEAP2_VISIBLE { + // This updates the VM, but the VM page is registered as a normal + // block in the WAL record, so no special handling is needed. + Outcome::Noop + } else { + Outcome::UnknownRecordType } } else { bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); @@ -674,6 +804,19 @@ impl<'a> WalIngest<'a> { old_heap_blkno = Some(decoded.blocks[0].blkno); flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; } + } else if info == pg_constants::XLOG_HEAP_TRUNCATE { + // per comment in heap_redo: + // TRUNCATE is a no-op because the actions are already logged as + // SMGR WAL records. TRUNCATE WAL record only exists for logical + // decoding. + Outcome::Noop + } else if info == pg_constants::XLOG_HEAP_CONFIRM + || info == pg_constants::XLOG_HEAP_INPLACE + { + // these don't update the FSM or VM, so no special handling needed. + Outcome::Noop + } else { + Outcome::UnknownRecordType } } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -698,6 +841,23 @@ impl<'a> WalIngest<'a> { old_heap_blkno = Some(decoded.blocks[0].blkno); flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; } + } else if info == pg_constants::XLOG_HEAP2_REWRITE + || info == pg_constants::XLOG_HEAP2_NEW_CID + { + // related to logical replication, we can ignore in storage + Outcome::Noop + } else if info == pg_constants::XLOG_HEAP2_PRUNE + || info == pg_constants::XLOG_HEAP2_VACUUM + || info == pg_constants::XLOG_HEAP2_FREEZE_PAGE + { + // these don't update the VM, so no special handling needed. + Outcome::Noop + } else if info == pg_constants::XLOG_HEAP2_VISIBLE { + // This updates the VM, but the VM page is registered as a normal + // block in the WAL record, so no special handling is needed. + Outcome::Noop + } else { + Outcome::UnknownRecordType } } else { bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);