From 5069123b6d5c5cdcac1a011cb3141cd915298161 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 24 Oct 2024 17:12:47 +0100 Subject: [PATCH] pageserver: refactor ingest inplace to decouple decoding and handling (#9472) ## Problem WAL ingest couples decoding of special records with their handling (updates to the storage engine mostly). This is a roadblock for our plan to move WAL filtering (and implicitly decoding) to safekeepers since they cannot do writes to the storage engine. ## Summary of changes This PR decouples the decoding of the special WAL records from their application. The changes are done in place and I've done my best to refrain from refactorings and attempted to preserve the original code as much as possible. Related: https://github.com/neondatabase/neon/issues/9335 Epic: https://github.com/neondatabase/neon/issues/9329 --- pageserver/src/walingest.rs | 1547 +++++++++++------ test_runner/regress/test_tenant_relocation.py | 20 +- 2 files changed, 1031 insertions(+), 536 deletions(-) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index d3e8bf59f2..8a4c0554f8 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -107,6 +107,143 @@ struct WarnIngestLag { timestamp_invalid_msg_ratelimit: RateLimit, } +// These structs are an intermediary representation of the PostgreSQL WAL records. +// The ones prefixed with `Xl` are lower level, while the ones that are not have +// all the required context to be acted upon by the pageserver. + +enum HeapamRecord { + ClearVmBits(ClearVmBits), +} + +struct ClearVmBits { + new_heap_blkno: Option, + old_heap_blkno: Option, + vm_rel: RelTag, + flags: u8, +} + +enum NeonrmgrRecord { + ClearVmBits(ClearVmBits), +} + +enum SmgrRecord { + Create(SmgrCreate), + Truncate(XlSmgrTruncate), +} + +struct SmgrCreate { + rel: RelTag, +} + +enum DbaseRecord { + Create(DbaseCreate), + Drop(DbaseDrop), +} + +struct DbaseCreate { + db_id: u32, + tablespace_id: u32, + src_db_id: u32, + src_tablespace_id: u32, +} + +struct DbaseDrop { + db_id: u32, + tablespace_ids: Vec, +} + +enum ClogRecord { + ZeroPage(ClogZeroPage), + Truncate(ClogTruncate), +} + +struct ClogZeroPage { + segno: u32, + rpageno: u32, +} + +struct ClogTruncate { + pageno: u32, + oldest_xid: u32, + oldest_xid_db: u32, +} + +enum XactRecord { + Commit(XactCommon), + Abort(XactCommon), + CommitPrepared(XactCommon), + AbortPrepared(XactCommon), + Prepare(XactPrepare), +} + +struct XactCommon { + parsed: XlXactParsedRecord, + origin_id: u16, + // Fields below are only used for logging + xl_xid: u32, + lsn: Lsn, +} + +struct XactPrepare { + xl_xid: u32, + data: Bytes, +} + +enum MultiXactRecord { + ZeroPage(MultiXactZeroPage), + Create(XlMultiXactCreate), + Truncate(XlMultiXactTruncate), +} + +struct MultiXactZeroPage { + slru_kind: SlruKind, + segno: u32, + rpageno: u32, +} + +enum RelmapRecord { + Update(RelmapUpdate), +} + +struct RelmapUpdate { + update: XlRelmapUpdate, + buf: Bytes, +} + +enum XlogRecord { + Raw(RawXlogRecord), +} + +struct RawXlogRecord { + info: u8, + lsn: Lsn, + buf: Bytes, +} + +enum LogicalMessageRecord { + Put(PutLogicalMessage), + #[cfg(feature = "testing")] + Failpoint, +} + +struct PutLogicalMessage { + path: String, + buf: Bytes, +} + +enum StandbyRecord { + RunningXacts(StandbyRunningXacts), +} + +struct StandbyRunningXacts { + oldest_running_xid: u32, +} + +enum ReploriginRecord { + Set(XlReploriginSet), + Drop(XlReploriginDrop), +} + impl WalIngest { pub async fn new( timeline: &Timeline, @@ -182,105 +319,58 @@ impl WalIngest { pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => { // Heap AM records need some special handling, because they modify VM pages // without registering them with the standard mechanism. - self.ingest_heapam_record(&mut buf, modification, &decoded, ctx) - .await?; + let maybe_heapam_record = + Self::decode_heapam_record(&mut buf, &decoded, pg_version)?; + if let Some(heapam_record) = maybe_heapam_record { + match heapam_record { + HeapamRecord::ClearVmBits(clear_vm_bits) => { + self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx) + .await?; + } + } + } } pg_constants::RM_NEON_ID => { - self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx) - .await?; + let maybe_nenonrmgr_record = + Self::decode_neonmgr_record(&mut buf, &decoded, pg_version)?; + if let Some(neonrmgr_record) = maybe_nenonrmgr_record { + match neonrmgr_record { + NeonrmgrRecord::ClearVmBits(clear_vm_bits) => { + self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx) + .await?; + } + } + } } // Handle other special record types pg_constants::RM_SMGR_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_SMGR_CREATE { - let create = XlSmgrCreate::decode(&mut buf); - self.ingest_xlog_smgr_create(modification, &create, ctx) - .await?; - } else if info == pg_constants::XLOG_SMGR_TRUNCATE { - let truncate = XlSmgrTruncate::decode(&mut buf); - self.ingest_xlog_smgr_truncate(modification, &truncate, ctx) - .await?; + let maybe_smgr_record = + Self::decode_smgr_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(smgr_record) = maybe_smgr_record { + match smgr_record { + SmgrRecord::Create(create) => { + self.ingest_xlog_smgr_create(create, modification, ctx) + .await?; + } + SmgrRecord::Truncate(truncate) => { + self.ingest_xlog_smgr_truncate(truncate, modification, ctx) + .await?; + } + } } } pg_constants::RM_DBASE_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - debug!(%info, %pg_version, "handle RM_DBASE_ID"); + let maybe_dbase_record = + Self::decode_dbase_record(&mut buf, &decoded, pg_version).unwrap(); - if pg_version == 14 { - if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE { - let createdb = XlCreateDatabase::decode(&mut buf); - debug!("XLOG_DBASE_CREATE v14"); - - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) + if let Some(dbase_record) = maybe_dbase_record { + match dbase_record { + DbaseRecord::Create(create) => { + self.ingest_xlog_dbase_create(create, modification, ctx) .await?; } - } - } else if pg_version == 15 { - if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; - } - } - } else if pg_version == 16 { - if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; - } - } - } else if pg_version == 17 { - if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; + DbaseRecord::Drop(drop) => { + self.ingest_xlog_dbase_drop(drop, modification, ctx).await?; } } } @@ -289,266 +379,113 @@ impl WalIngest { trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); } pg_constants::RM_CLOG_ID => { - let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; + // [`Self::decode_clog_record`] may never fail and always returns. + // It has this interface to match all the other decoding methods. + let clog_record = Self::decode_clog_record(&mut buf, &decoded, pg_version) + .unwrap() + .unwrap(); - if info == pg_constants::CLOG_ZEROPAGE { - let pageno = if pg_version < 17 { - buf.get_u32_le() - } else { - buf.get_u64_le() as u32 - }; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::Clog, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else { - assert!(info == pg_constants::CLOG_TRUNCATE); - let xlrec = XlClogTruncate::decode(&mut buf, pg_version); - self.ingest_clog_truncate_record(modification, &xlrec, ctx) - .await?; + match clog_record { + ClogRecord::ZeroPage(zero_page) => { + self.ingest_clog_zero_page(zero_page, modification, ctx) + .await?; + } + ClogRecord::Truncate(truncate) => { + self.ingest_clog_truncate(truncate, modification, ctx) + .await?; + } } } pg_constants::RM_XACT_ID => { - let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; - - if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT { - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record( - modification, - &parsed_xact, - info == pg_constants::XLOG_XACT_COMMIT, - decoded.origin_id, - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED - || info == pg_constants::XLOG_XACT_ABORT_PREPARED - { - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record( - modification, - &parsed_xact, - info == pg_constants::XLOG_XACT_COMMIT_PREPARED, - decoded.origin_id, - ctx, - ) - .await?; - // Remove twophase file. see RemoveTwoPhaseFile() in postgres code - trace!( - "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", - decoded.xl_xid, - parsed_xact.xid, - lsn, - ); - - let xid: u64 = if pg_version >= 17 { - self.adjust_to_full_transaction_id(parsed_xact.xid)? - } else { - parsed_xact.xid as u64 - }; - modification.drop_twophase_file(xid, ctx).await?; - } else if info == pg_constants::XLOG_XACT_PREPARE { - let xid: u64 = if pg_version >= 17 { - self.adjust_to_full_transaction_id(decoded.xl_xid)? - } else { - decoded.xl_xid as u64 - }; - modification - .put_twophase_file(xid, Bytes::copy_from_slice(&buf[..]), ctx) + let maybe_xact_record = + Self::decode_xact_record(&mut buf, &decoded, lsn, pg_version).unwrap(); + if let Some(xact_record) = maybe_xact_record { + self.ingest_xact_record(xact_record, modification, ctx) .await?; } } pg_constants::RM_MULTIXACT_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { - let pageno = if pg_version < 17 { - buf.get_u32_le() - } else { - buf.get_u64_le() as u32 - }; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::MultiXactOffsets, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { - let pageno = if pg_version < 17 { - buf.get_u32_le() - } else { - buf.get_u64_le() as u32 - }; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::MultiXactMembers, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - self.ingest_multixact_create_record(modification, &xlrec)?; - } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { - let xlrec = XlMultiXactTruncate::decode(&mut buf); - self.ingest_multixact_truncate_record(modification, &xlrec, ctx) - .await?; + let maybe_multixact_record = + Self::decode_multixact_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(multixact_record) = maybe_multixact_record { + match multixact_record { + MultiXactRecord::ZeroPage(zero_page) => { + self.ingest_multixact_zero_page(zero_page, modification, ctx) + .await?; + } + MultiXactRecord::Create(create) => { + self.ingest_multixact_create(modification, &create)?; + } + MultiXactRecord::Truncate(truncate) => { + self.ingest_multixact_truncate(modification, &truncate, ctx) + .await?; + } + } } } pg_constants::RM_RELMAP_ID => { - let xlrec = XlRelmapUpdate::decode(&mut buf); - self.ingest_relmap_page(modification, &xlrec, &decoded, ctx) - .await?; - } - pg_constants::RM_XLOG_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_PARAMETER_CHANGE { - if let CheckPoint::V17(cp) = &mut self.checkpoint { - let rec = v17::XlParameterChange::decode(&mut buf); - cp.wal_level = rec.wal_level; - self.checkpoint_modified = true; - } - } else if info == pg_constants::XLOG_END_OF_RECOVERY { - if let CheckPoint::V17(cp) = &mut self.checkpoint { - let rec = v17::XlEndOfRecovery::decode(&mut buf); - cp.wal_level = rec.wal_level; - self.checkpoint_modified = true; + let relmap_record = Self::decode_relmap_record(&mut buf, &decoded, pg_version) + .unwrap() + .unwrap(); + match relmap_record { + RelmapRecord::Update(update) => { + self.ingest_relmap_update(update, modification, ctx).await?; } } + } + // This is an odd duck. It needs to go to all shards. + // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY + // in WalIngest::new), we have to send the whole DecodedWalRecord::record to + // the pageserver and decode it there. + // + // Alternatively, one can make the checkpoint part of the subscription protocol + // to the pageserver. This should work fine, but can be done at a later point. + pg_constants::RM_XLOG_ID => { + let xlog_record = Self::decode_xlog_record(&mut buf, &decoded, lsn, pg_version) + .unwrap() + .unwrap(); - enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { - if info == pg_constants::XLOG_NEXTOID { - let next_oid = buf.get_u32_le(); - if cp.nextOid != next_oid { - cp.nextOid = next_oid; - self.checkpoint_modified = true; - } - } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE - || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN - { - let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT]; - buf.copy_to_slice(&mut checkpoint_bytes); - let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?; - trace!( - "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", - xlog_checkpoint.oldestXid, - cp.oldestXid - ); - if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 { - cp.oldestXid = xlog_checkpoint.oldestXid; - } - trace!( - "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}", - xlog_checkpoint.oldestActiveXid, - cp.oldestActiveXid - ); - - // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`, - // because at shutdown, all in-progress transactions will implicitly - // end. Postgres startup code knows that, and allows hot standby to start - // immediately from a shutdown checkpoint. - // - // In Neon, Postgres hot standby startup always behaves as if starting from - // an online checkpoint. It needs a valid `oldestActiveXid` value, so - // instead of overwriting self.checkpoint.oldestActiveXid with - // InvalidTransactionid from the checkpoint WAL record, update it to a - // proper value, knowing that there are no in-progress transactions at this - // point, except for prepared transactions. - // - // See also the neon code changes in the InitWalRecovery() function. - if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID - && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN - { - let oldest_active_xid = if pg_version >= 17 { - let mut oldest_active_full_xid = cp.nextXid.value; - for xid in modification.tline.list_twophase_files(lsn, ctx).await? { - if xid < oldest_active_full_xid { - oldest_active_full_xid = xid; - } - } - oldest_active_full_xid as u32 - } else { - let mut oldest_active_xid = cp.nextXid.value as u32; - for xid in modification.tline.list_twophase_files(lsn, ctx).await? { - let narrow_xid = xid as u32; - if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 { - oldest_active_xid = narrow_xid; - } - } - oldest_active_xid - }; - cp.oldestActiveXid = oldest_active_xid; - } else { - cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid; - } - - // Write a new checkpoint key-value pair on every checkpoint record, even - // if nothing really changed. Not strictly required, but it seems nice to - // have some trace of the checkpoint records in the layer files at the same - // LSNs. - self.checkpoint_modified = true; + match xlog_record { + XlogRecord::Raw(raw) => { + self.ingest_raw_xlog_record(raw, modification, ctx).await?; } - }); + } } pg_constants::RM_LOGICALMSG_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_LOGICAL_MESSAGE { - let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf); - let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; - let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size]; - if prefix == "neon-test" { - // This is a convenient way to make the WAL ingestion pause at - // particular point in the WAL. For more fine-grained control, - // we could peek into the message and only pause if it contains - // a particular string, for example, but this is enough for now. - failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep"); - } else if let Some(path) = prefix.strip_prefix("neon-file:") { - modification.put_file(path, message, ctx).await?; + let maybe_logical_message_record = + Self::decode_logical_message_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(logical_message_record) = maybe_logical_message_record { + match logical_message_record { + LogicalMessageRecord::Put(put) => { + self.ingest_logical_message_put(put, modification, ctx) + .await?; + } + #[cfg(feature = "testing")] + LogicalMessageRecord::Failpoint => { + // This is a convenient way to make the WAL ingestion pause at + // particular point in the WAL. For more fine-grained control, + // we could peek into the message and only pause if it contains + // a particular string, for example, but this is enough for now. + failpoint_support::sleep_millis_async!( + "pageserver-wal-ingest-logical-message-sleep" + ); + } } } } pg_constants::RM_STANDBY_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_RUNNING_XACTS { - let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf); - - enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { - cp.oldestActiveXid = xlrec.oldest_running_xid; - }); - - self.checkpoint_modified = true; + let maybe_standby_record = + Self::decode_standby_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(standby_record) = maybe_standby_record { + self.ingest_standby_record(standby_record).unwrap(); } } pg_constants::RM_REPLORIGIN_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_REPLORIGIN_SET { - let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf); - modification - .set_replorigin(xlrec.node_id, xlrec.remote_lsn) - .await? - } else if info == pg_constants::XLOG_REPLORIGIN_DROP { - let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf); - modification.drop_replorigin(xlrec.node_id).await? + let maybe_replorigin_record = + Self::decode_replorigin_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(replorigin_record) = maybe_replorigin_record { + self.ingest_replorigin_record(replorigin_record, modification) + .await?; } } _x => { @@ -709,13 +646,99 @@ impl WalIngest { Ok(()) } - async fn ingest_heapam_record( + async fn ingest_clear_vm_bits( &mut self, - buf: &mut Bytes, + clear_vm_bits: ClearVmBits, modification: &mut DatadirModification<'_>, - decoded: &DecodedWALRecord, ctx: &RequestContext, ) -> anyhow::Result<()> { + let ClearVmBits { + new_heap_blkno, + old_heap_blkno, + flags, + vm_rel, + } = clear_vm_bits; + // Clear the VM bits if required. + let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); + let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); + + // Sometimes, Postgres seems to create heap WAL records with the + // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is + // not set. In fact, it's possible that the VM page does not exist at all. + // In that case, we don't want to store a record to clear the VM bit; + // replaying it would fail to find the previous image of the page, because + // it doesn't exist. So check if the VM page(s) exist, and skip the WAL + // record if it doesn't. + let vm_size = get_relsize(modification, vm_rel, ctx).await?; + if let Some(blknum) = new_vm_blk { + if blknum >= vm_size { + new_vm_blk = None; + } + } + if let Some(blknum) = old_vm_blk { + if blknum >= vm_size { + old_vm_blk = None; + } + } + + if new_vm_blk.is_some() || old_vm_blk.is_some() { + if new_vm_blk == old_vm_blk { + // An UPDATE record that needs to clear the bits for both old and the + // new page, both of which reside on the same VM page. + self.put_rel_wal_record( + modification, + vm_rel, + new_vm_blk.unwrap(), + NeonWalRecord::ClearVisibilityMapFlags { + new_heap_blkno, + old_heap_blkno, + flags, + }, + ctx, + ) + .await?; + } else { + // Clear VM bits for one heap page, or for two pages that reside on + // different VM pages. + if let Some(new_vm_blk) = new_vm_blk { + self.put_rel_wal_record( + modification, + vm_rel, + new_vm_blk, + NeonWalRecord::ClearVisibilityMapFlags { + new_heap_blkno, + old_heap_blkno: None, + flags, + }, + ctx, + ) + .await?; + } + if let Some(old_vm_blk) = old_vm_blk { + self.put_rel_wal_record( + modification, + vm_rel, + old_vm_blk, + NeonWalRecord::ClearVisibilityMapFlags { + new_heap_blkno: None, + old_heap_blkno, + flags, + }, + ctx, + ) + .await?; + } + } + } + + Ok(()) + } + + fn decode_heapam_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { // Handle VM bit updates that are implicitly part of heap records. // First, look at the record to determine which VM bits need @@ -725,7 +748,7 @@ impl WalIngest { let mut old_heap_blkno: Option = None; let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; - match modification.tline.pg_version { + match pg_version { 14 => { if decoded.xl_rmid == pg_constants::RM_HEAP_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -997,7 +1020,6 @@ impl WalIngest { _ => {} } - // Clear the VM bits if required. if new_heap_blkno.is_some() || old_heap_blkno.is_some() { let vm_rel = RelTag { forknum: VISIBILITYMAP_FORKNUM, @@ -1006,89 +1028,22 @@ impl WalIngest { relnode: decoded.blocks[0].rnode_relnode, }; - let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); - let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); - - // Sometimes, Postgres seems to create heap WAL records with the - // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is - // not set. In fact, it's possible that the VM page does not exist at all. - // In that case, we don't want to store a record to clear the VM bit; - // replaying it would fail to find the previous image of the page, because - // it doesn't exist. So check if the VM page(s) exist, and skip the WAL - // record if it doesn't. - let vm_size = get_relsize(modification, vm_rel, ctx).await?; - if let Some(blknum) = new_vm_blk { - if blknum >= vm_size { - new_vm_blk = None; - } - } - if let Some(blknum) = old_vm_blk { - if blknum >= vm_size { - old_vm_blk = None; - } - } - - if new_vm_blk.is_some() || old_vm_blk.is_some() { - if new_vm_blk == old_vm_blk { - // An UPDATE record that needs to clear the bits for both old and the - // new page, both of which reside on the same VM page. - self.put_rel_wal_record( - modification, - vm_rel, - new_vm_blk.unwrap(), - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno, - old_heap_blkno, - flags, - }, - ctx, - ) - .await?; - } else { - // Clear VM bits for one heap page, or for two pages that reside on - // different VM pages. - if let Some(new_vm_blk) = new_vm_blk { - self.put_rel_wal_record( - modification, - vm_rel, - new_vm_blk, - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno, - old_heap_blkno: None, - flags, - }, - ctx, - ) - .await?; - } - if let Some(old_vm_blk) = old_vm_blk { - self.put_rel_wal_record( - modification, - vm_rel, - old_vm_blk, - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno: None, - old_heap_blkno, - flags, - }, - ctx, - ) - .await?; - } - } - } + Ok(Some(HeapamRecord::ClearVmBits(ClearVmBits { + new_heap_blkno, + old_heap_blkno, + vm_rel, + flags, + }))) + } else { + Ok(None) } - - Ok(()) } - async fn ingest_neonrmgr_record( - &mut self, + fn decode_neonmgr_record( buf: &mut Bytes, - modification: &mut DatadirModification<'_>, decoded: &DecodedWALRecord, - ctx: &RequestContext, - ) -> anyhow::Result<()> { + pg_version: u32, + ) -> anyhow::Result> { // Handle VM bit updates that are implicitly part of heap records. // First, look at the record to determine which VM bits need @@ -1097,7 +1052,6 @@ impl WalIngest { let mut new_heap_blkno: Option = None; let mut old_heap_blkno: Option = None; let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; - let pg_version = modification.tline.pg_version; assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID); @@ -1168,7 +1122,6 @@ impl WalIngest { ), } - // Clear the VM bits if required. if new_heap_blkno.is_some() || old_heap_blkno.is_some() { let vm_rel = RelTag { forknum: VISIBILITYMAP_FORKNUM, @@ -1177,93 +1130,30 @@ impl WalIngest { relnode: decoded.blocks[0].rnode_relnode, }; - let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); - let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); - - // Sometimes, Postgres seems to create heap WAL records with the - // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is - // not set. In fact, it's possible that the VM page does not exist at all. - // In that case, we don't want to store a record to clear the VM bit; - // replaying it would fail to find the previous image of the page, because - // it doesn't exist. So check if the VM page(s) exist, and skip the WAL - // record if it doesn't. - let vm_size = get_relsize(modification, vm_rel, ctx).await?; - if let Some(blknum) = new_vm_blk { - if blknum >= vm_size { - new_vm_blk = None; - } - } - if let Some(blknum) = old_vm_blk { - if blknum >= vm_size { - old_vm_blk = None; - } - } - - if new_vm_blk.is_some() || old_vm_blk.is_some() { - if new_vm_blk == old_vm_blk { - // An UPDATE record that needs to clear the bits for both old and the - // new page, both of which reside on the same VM page. - self.put_rel_wal_record( - modification, - vm_rel, - new_vm_blk.unwrap(), - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno, - old_heap_blkno, - flags, - }, - ctx, - ) - .await?; - } else { - // Clear VM bits for one heap page, or for two pages that reside on - // different VM pages. - if let Some(new_vm_blk) = new_vm_blk { - self.put_rel_wal_record( - modification, - vm_rel, - new_vm_blk, - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno, - old_heap_blkno: None, - flags, - }, - ctx, - ) - .await?; - } - if let Some(old_vm_blk) = old_vm_blk { - self.put_rel_wal_record( - modification, - vm_rel, - old_vm_blk, - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno: None, - old_heap_blkno, - flags, - }, - ctx, - ) - .await?; - } - } - } + Ok(Some(NeonrmgrRecord::ClearVmBits(ClearVmBits { + new_heap_blkno, + old_heap_blkno, + vm_rel, + flags, + }))) + } else { + Ok(None) } - - Ok(()) } /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. async fn ingest_xlog_dbase_create( &mut self, + create: DbaseCreate, modification: &mut DatadirModification<'_>, - rec: &XlCreateDatabase, ctx: &RequestContext, ) -> anyhow::Result<()> { - let db_id = rec.db_id; - let tablespace_id = rec.tablespace_id; - let src_db_id = rec.src_db_id; - let src_tablespace_id = rec.src_tablespace_id; + let DbaseCreate { + db_id, + tablespace_id, + src_db_id, + src_tablespace_id, + } = create; let rels = modification .tline @@ -1349,46 +1239,209 @@ impl WalIngest { Ok(()) } - async fn ingest_xlog_smgr_create( + async fn ingest_xlog_dbase_drop( &mut self, + dbase_drop: DbaseDrop, modification: &mut DatadirModification<'_>, - rec: &XlSmgrCreate, ctx: &RequestContext, ) -> anyhow::Result<()> { - let rel = RelTag { - spcnode: rec.rnode.spcnode, - dbnode: rec.rnode.dbnode, - relnode: rec.rnode.relnode, - forknum: rec.forknum, - }; + let DbaseDrop { + db_id, + tablespace_ids, + } = dbase_drop; + for tablespace_id in tablespace_ids { + trace!("Drop db {}, {}", tablespace_id, db_id); + modification.drop_dbdir(tablespace_id, db_id, ctx).await?; + } + + Ok(()) + } + + fn decode_dbase_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + // TODO: Refactor this to avoid the duplication between postgres versions. + + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + debug!(%info, %pg_version, "handle RM_DBASE_ID"); + + if pg_version == 14 { + if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE { + let createdb = XlCreateDatabase::decode(buf); + debug!("XLOG_DBASE_CREATE v14"); + + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } else if pg_version == 15 { + if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } else if pg_version == 16 { + if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } else if pg_version == 17 { + if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } + + Ok(None) + } + + async fn ingest_xlog_smgr_create( + &mut self, + create: SmgrCreate, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let SmgrCreate { rel } = create; self.put_rel_creation(modification, rel, ctx).await?; Ok(()) } + fn decode_smgr_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_SMGR_CREATE { + let create = XlSmgrCreate::decode(buf); + let rel = RelTag { + spcnode: create.rnode.spcnode, + dbnode: create.rnode.dbnode, + relnode: create.rnode.relnode, + forknum: create.forknum, + }; + + return Ok(Some(SmgrRecord::Create(SmgrCreate { rel }))); + } else if info == pg_constants::XLOG_SMGR_TRUNCATE { + let truncate = XlSmgrTruncate::decode(buf); + return Ok(Some(SmgrRecord::Truncate(truncate))); + } + + Ok(None) + } + /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record. /// /// This is the same logic as in PostgreSQL's smgr_redo() function. async fn ingest_xlog_smgr_truncate( &mut self, + truncate: XlSmgrTruncate, modification: &mut DatadirModification<'_>, - rec: &XlSmgrTruncate, ctx: &RequestContext, ) -> anyhow::Result<()> { - let spcnode = rec.rnode.spcnode; - let dbnode = rec.rnode.dbnode; - let relnode = rec.rnode.relnode; + let XlSmgrTruncate { + blkno, + rnode, + flags, + } = truncate; - if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 { + let spcnode = rnode.spcnode; + let dbnode = rnode.dbnode; + let relnode = rnode.relnode; + + if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 { let rel = RelTag { spcnode, dbnode, relnode, forknum: MAIN_FORKNUM, }; - self.put_rel_truncation(modification, rel, rec.blkno, ctx) + + self.put_rel_truncation(modification, rel, blkno, ctx) .await?; } - if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 { + if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 { let rel = RelTag { spcnode, dbnode, @@ -1396,9 +1449,9 @@ impl WalIngest { forknum: FSM_FORKNUM, }; - let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE; + let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE; let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no); - if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 { + if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 { // Tail of last remaining FSM page has to be zeroed. // We are not precise here and instead of digging in FSM bitmap format just clear the whole page. modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?; @@ -1411,7 +1464,7 @@ impl WalIngest { .await?; } } - if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 { + if flags & pg_constants::SMGR_TRUNCATE_VM != 0 { let rel = RelTag { spcnode, dbnode, @@ -1419,8 +1472,8 @@ impl WalIngest { forknum: VISIBILITYMAP_FORKNUM, }; - let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE; - if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 { + let mut vm_page_no = blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE; + if blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 { // Tail of last remaining vm page has to be zeroed. // We are not precise here and instead of digging in VM bitmap format just clear the whole page. modification.put_rel_page_image_zero(rel, vm_page_no)?; @@ -1493,12 +1546,32 @@ impl WalIngest { /// async fn ingest_xact_record( &mut self, + record: XactRecord, modification: &mut DatadirModification<'_>, - parsed: &XlXactParsedRecord, - is_commit: bool, - origin_id: u16, ctx: &RequestContext, ) -> anyhow::Result<()> { + let (xact_common, is_commit, is_prepared) = match record { + XactRecord::Prepare(XactPrepare { xl_xid, data }) => { + let xid: u64 = if modification.tline.pg_version >= 17 { + self.adjust_to_full_transaction_id(xl_xid)? + } else { + xl_xid as u64 + }; + return modification.put_twophase_file(xid, data, ctx).await; + } + XactRecord::Commit(common) => (common, true, false), + XactRecord::Abort(common) => (common, false, false), + XactRecord::CommitPrepared(common) => (common, true, true), + XactRecord::AbortPrepared(common) => (common, false, true), + }; + + let XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + } = xact_common; + // Record update of CLOG pages let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; @@ -1569,18 +1642,95 @@ impl WalIngest { .set_replorigin(origin_id, parsed.origin_lsn) .await?; } + + if is_prepared { + // Remove twophase file. see RemoveTwoPhaseFile() in postgres code + trace!( + "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", + xl_xid, + parsed.xid, + lsn, + ); + + let xid: u64 = if modification.tline.pg_version >= 17 { + self.adjust_to_full_transaction_id(parsed.xid)? + } else { + parsed.xid as u64 + }; + modification.drop_twophase_file(xid, ctx).await?; + } + Ok(()) } - async fn ingest_clog_truncate_record( + // TODO(vlad): Standardise interface for `decode_...` + fn decode_xact_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + lsn: Lsn, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; + let origin_id = decoded.origin_id; + let xl_xid = decoded.xl_xid; + + if info == pg_constants::XLOG_XACT_COMMIT { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::Commit(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_ABORT { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::Abort(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::CommitPrepared(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::AbortPrepared(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_PREPARE { + return Ok(Some(XactRecord::Prepare(XactPrepare { + xl_xid: decoded.xl_xid, + data: Bytes::copy_from_slice(&buf[..]), + }))); + } + + Ok(None) + } + + async fn ingest_clog_truncate( &mut self, + truncate: ClogTruncate, modification: &mut DatadirModification<'_>, - xlrec: &XlClogTruncate, ctx: &RequestContext, ) -> anyhow::Result<()> { + let ClogTruncate { + pageno, + oldest_xid, + oldest_xid_db, + } = truncate; + info!( "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}", - xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db + pageno, oldest_xid, oldest_xid_db ); // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is @@ -1588,8 +1738,8 @@ impl WalIngest { // later. In Neon, a server can start at any LSN, not just on a checkpoint record, // so we keep the oldestXid and oldestXidDB up-to-date. enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { - cp.oldestXid = xlrec.oldest_xid; - cp.oldestXidDB = xlrec.oldest_xid_db; + cp.oldestXid = oldest_xid; + cp.oldestXidDB = oldest_xid_db; }); self.checkpoint_modified = true; @@ -1606,7 +1756,7 @@ impl WalIngest { // the current endpoint page must not be eligible for removal. // See SimpleLruTruncate() in slru.c if dispatch_pgversion!(modification.tline.pg_version, { - pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, xlrec.pageno) + pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno) }) { info!("could not truncate directory pg_xact apparent wraparound"); return Ok(()); @@ -1626,7 +1776,7 @@ impl WalIngest { let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; let may_delete = dispatch_pgversion!(modification.tline.pg_version, { - pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, xlrec.pageno) + pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno) }); if may_delete { @@ -1640,7 +1790,55 @@ impl WalIngest { Ok(()) } - fn ingest_multixact_create_record( + async fn ingest_clog_zero_page( + &mut self, + zero_page: ClogZeroPage, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let ClogZeroPage { segno, rpageno } = zero_page; + + self.put_slru_page_image( + modification, + SlruKind::Clog, + segno, + rpageno, + ZERO_PAGE.clone(), + ctx, + ) + .await + } + + fn decode_clog_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; + + if info == pg_constants::CLOG_ZEROPAGE { + let pageno = if pg_version < 17 { + buf.get_u32_le() + } else { + buf.get_u64_le() as u32 + }; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + Ok(Some(ClogRecord::ZeroPage(ClogZeroPage { segno, rpageno }))) + } else { + assert!(info == pg_constants::CLOG_TRUNCATE); + let xlrec = XlClogTruncate::decode(buf, pg_version); + + Ok(Some(ClogRecord::Truncate(ClogTruncate { + pageno: xlrec.pageno, + oldest_xid: xlrec.oldest_xid, + oldest_xid_db: xlrec.oldest_xid_db, + }))) + } + } + + fn ingest_multixact_create( &mut self, modification: &mut DatadirModification, xlrec: &XlMultiXactCreate, @@ -1742,7 +1940,7 @@ impl WalIngest { Ok(()) } - async fn ingest_multixact_truncate_record( + async fn ingest_multixact_truncate( &mut self, modification: &mut DatadirModification<'_>, xlrec: &XlMultiXactTruncate, @@ -1788,26 +1986,315 @@ impl WalIngest { Ok(()) } - async fn ingest_relmap_page( + async fn ingest_multixact_zero_page( &mut self, + zero_page: MultiXactZeroPage, modification: &mut DatadirModification<'_>, - xlrec: &XlRelmapUpdate, - decoded: &DecodedWALRecord, ctx: &RequestContext, ) -> Result<()> { + let MultiXactZeroPage { + slru_kind, + segno, + rpageno, + } = zero_page; + self.put_slru_page_image( + modification, + slru_kind, + segno, + rpageno, + ZERO_PAGE.clone(), + ctx, + ) + .await + } + + fn decode_multixact_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE + || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE + { + let pageno = if pg_version < 17 { + buf.get_u32_le() + } else { + buf.get_u64_le() as u32 + }; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + let slru_kind = match info { + pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets, + pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers, + _ => unreachable!(), + }; + + return Ok(Some(MultiXactRecord::ZeroPage(MultiXactZeroPage { + slru_kind, + segno, + rpageno, + }))); + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(buf); + return Ok(Some(MultiXactRecord::Create(xlrec))); + } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { + let xlrec = XlMultiXactTruncate::decode(buf); + return Ok(Some(MultiXactRecord::Truncate(xlrec))); + } + + Ok(None) + } + + async fn ingest_relmap_update( + &mut self, + update: RelmapUpdate, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> Result<()> { + let RelmapUpdate { update, buf } = update; + + modification + .put_relmap_file(update.tsid, update.dbid, buf, ctx) + .await + } + + fn decode_relmap_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let update = XlRelmapUpdate::decode(buf); + let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); // skip xl_relmap_update buf.advance(12); - modification - .put_relmap_file( - xlrec.tsid, - xlrec.dbid, - Bytes::copy_from_slice(&buf[..]), - ctx, - ) - .await + Ok(Some(RelmapRecord::Update(RelmapUpdate { + update, + buf: Bytes::copy_from_slice(&buf[..]), + }))) + } + + async fn ingest_raw_xlog_record( + &mut self, + raw_record: RawXlogRecord, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> Result<()> { + let RawXlogRecord { info, lsn, mut buf } = raw_record; + let pg_version = modification.tline.pg_version; + + if info == pg_constants::XLOG_PARAMETER_CHANGE { + if let CheckPoint::V17(cp) = &mut self.checkpoint { + let rec = v17::XlParameterChange::decode(&mut buf); + cp.wal_level = rec.wal_level; + self.checkpoint_modified = true; + } + } else if info == pg_constants::XLOG_END_OF_RECOVERY { + if let CheckPoint::V17(cp) = &mut self.checkpoint { + let rec = v17::XlEndOfRecovery::decode(&mut buf); + cp.wal_level = rec.wal_level; + self.checkpoint_modified = true; + } + } + + enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { + if info == pg_constants::XLOG_NEXTOID { + let next_oid = buf.get_u32_le(); + if cp.nextOid != next_oid { + cp.nextOid = next_oid; + self.checkpoint_modified = true; + } + } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE + || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT]; + buf.copy_to_slice(&mut checkpoint_bytes); + let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?; + trace!( + "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", + xlog_checkpoint.oldestXid, + cp.oldestXid + ); + if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 { + cp.oldestXid = xlog_checkpoint.oldestXid; + } + trace!( + "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}", + xlog_checkpoint.oldestActiveXid, + cp.oldestActiveXid + ); + + // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`, + // because at shutdown, all in-progress transactions will implicitly + // end. Postgres startup code knows that, and allows hot standby to start + // immediately from a shutdown checkpoint. + // + // In Neon, Postgres hot standby startup always behaves as if starting from + // an online checkpoint. It needs a valid `oldestActiveXid` value, so + // instead of overwriting self.checkpoint.oldestActiveXid with + // InvalidTransactionid from the checkpoint WAL record, update it to a + // proper value, knowing that there are no in-progress transactions at this + // point, except for prepared transactions. + // + // See also the neon code changes in the InitWalRecovery() function. + if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID + && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let oldest_active_xid = if pg_version >= 17 { + let mut oldest_active_full_xid = cp.nextXid.value; + for xid in modification.tline.list_twophase_files(lsn, ctx).await? { + if xid < oldest_active_full_xid { + oldest_active_full_xid = xid; + } + } + oldest_active_full_xid as u32 + } else { + let mut oldest_active_xid = cp.nextXid.value as u32; + for xid in modification.tline.list_twophase_files(lsn, ctx).await? { + let narrow_xid = xid as u32; + if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 { + oldest_active_xid = narrow_xid; + } + } + oldest_active_xid + }; + cp.oldestActiveXid = oldest_active_xid; + } else { + cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid; + } + + // Write a new checkpoint key-value pair on every checkpoint record, even + // if nothing really changed. Not strictly required, but it seems nice to + // have some trace of the checkpoint records in the layer files at the same + // LSNs. + self.checkpoint_modified = true; + } + }); + + Ok(()) + } + + fn decode_xlog_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + lsn: Lsn, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + Ok(Some(XlogRecord::Raw(RawXlogRecord { + info, + lsn, + buf: buf.clone(), + }))) + } + + async fn ingest_logical_message_put( + &mut self, + put: PutLogicalMessage, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> Result<()> { + let PutLogicalMessage { path, buf } = put; + modification.put_file(path.as_str(), &buf, ctx).await + } + + fn decode_logical_message_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_LOGICAL_MESSAGE { + let xlrec = crate::walrecord::XlLogicalMessage::decode(buf); + let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; + + #[cfg(feature = "testing")] + if prefix == "neon-test" { + return Ok(Some(LogicalMessageRecord::Failpoint)); + } + + if let Some(path) = prefix.strip_prefix("neon-file:") { + let buf_size = xlrec.prefix_size + xlrec.message_size; + let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]); + return Ok(Some(LogicalMessageRecord::Put(PutLogicalMessage { + path: path.to_string(), + buf, + }))); + } + } + + Ok(None) + } + + fn decode_standby_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_RUNNING_XACTS { + let xlrec = crate::walrecord::XlRunningXacts::decode(buf); + return Ok(Some(StandbyRecord::RunningXacts(StandbyRunningXacts { + oldest_running_xid: xlrec.oldest_running_xid, + }))); + } + + Ok(None) + } + + fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> { + match record { + StandbyRecord::RunningXacts(running_xacts) => { + enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { + cp.oldestActiveXid = running_xacts.oldest_running_xid; + }); + + self.checkpoint_modified = true; + } + } + + Ok(()) + } + + fn decode_replorigin_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_REPLORIGIN_SET { + let xlrec = crate::walrecord::XlReploriginSet::decode(buf); + return Ok(Some(ReploriginRecord::Set(xlrec))); + } else if info == pg_constants::XLOG_REPLORIGIN_DROP { + let xlrec = crate::walrecord::XlReploriginDrop::decode(buf); + return Ok(Some(ReploriginRecord::Drop(xlrec))); + } + + Ok(None) + } + + async fn ingest_replorigin_record( + &mut self, + record: ReploriginRecord, + modification: &mut DatadirModification<'_>, + ) -> Result<()> { + match record { + ReploriginRecord::Set(set) => { + modification + .set_replorigin(set.node_id, set.remote_lsn) + .await?; + } + ReploriginRecord::Drop(drop) => { + modification.drop_replorigin(drop.node_id).await?; + } + } + + Ok(()) } async fn put_rel_creation( diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index 5561a128b7..fc9adb14c9 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -435,7 +435,9 @@ def test_emergency_relocate_with_branches_slow_replay( # This fail point will pause the WAL ingestion on the main branch, after the # the first insert - pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")]) + pageserver_http.configure_failpoints( + [("pageserver-wal-ingest-logical-message-sleep", "return(5000)")] + ) # Attach and wait a few seconds to give it time to load the tenants, attach to the # safekeepers, and to stream and ingest the WAL up to the pause-point. @@ -453,11 +455,13 @@ def test_emergency_relocate_with_branches_slow_replay( assert cur.fetchall() == [("before pause",), ("after pause",)] # Sanity check that the failpoint was reached - env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + env.pageserver.assert_log_contains( + 'failpoint "pageserver-wal-ingest-logical-message-sleep": sleep done' + ) assert time.time() - before_attach_time > 5 # Clean up - pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off")) + pageserver_http.configure_failpoints(("pageserver-wal-ingest-logical-message-sleep", "off")) # Simulate hard crash of pageserver and re-attach a tenant with a branch @@ -581,7 +585,9 @@ def test_emergency_relocate_with_branches_createdb( # bug reproduced easily even without this, as there is always some delay between # loading the timeline and establishing the connection to the safekeeper to stream and # ingest the WAL, but let's make this less dependent on accidental timing. - pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")]) + pageserver_http.configure_failpoints( + [("pageserver-wal-ingest-logical-message-sleep", "return(5000)")] + ) before_attach_time = time.time() env.pageserver.tenant_attach(tenant_id) @@ -590,8 +596,10 @@ def test_emergency_relocate_with_branches_createdb( assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200 # Sanity check that the failpoint was reached - env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + env.pageserver.assert_log_contains( + 'failpoint "pageserver-wal-ingest-logical-message-sleep": sleep done' + ) assert time.time() - before_attach_time > 5 # Clean up - pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off")) + pageserver_http.configure_failpoints(("pageserver-wal-ingest-logical-message-sleep", "off"))