diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index d3e8bf59f2..8a4c0554f8 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -107,6 +107,143 @@ struct WarnIngestLag { timestamp_invalid_msg_ratelimit: RateLimit, } +// These structs are an intermediary representation of the PostgreSQL WAL records. +// The ones prefixed with `Xl` are lower level, while the ones that are not have +// all the required context to be acted upon by the pageserver. + +enum HeapamRecord { + ClearVmBits(ClearVmBits), +} + +struct ClearVmBits { + new_heap_blkno: Option, + old_heap_blkno: Option, + vm_rel: RelTag, + flags: u8, +} + +enum NeonrmgrRecord { + ClearVmBits(ClearVmBits), +} + +enum SmgrRecord { + Create(SmgrCreate), + Truncate(XlSmgrTruncate), +} + +struct SmgrCreate { + rel: RelTag, +} + +enum DbaseRecord { + Create(DbaseCreate), + Drop(DbaseDrop), +} + +struct DbaseCreate { + db_id: u32, + tablespace_id: u32, + src_db_id: u32, + src_tablespace_id: u32, +} + +struct DbaseDrop { + db_id: u32, + tablespace_ids: Vec, +} + +enum ClogRecord { + ZeroPage(ClogZeroPage), + Truncate(ClogTruncate), +} + +struct ClogZeroPage { + segno: u32, + rpageno: u32, +} + +struct ClogTruncate { + pageno: u32, + oldest_xid: u32, + oldest_xid_db: u32, +} + +enum XactRecord { + Commit(XactCommon), + Abort(XactCommon), + CommitPrepared(XactCommon), + AbortPrepared(XactCommon), + Prepare(XactPrepare), +} + +struct XactCommon { + parsed: XlXactParsedRecord, + origin_id: u16, + // Fields below are only used for logging + xl_xid: u32, + lsn: Lsn, +} + +struct XactPrepare { + xl_xid: u32, + data: Bytes, +} + +enum MultiXactRecord { + ZeroPage(MultiXactZeroPage), + Create(XlMultiXactCreate), + Truncate(XlMultiXactTruncate), +} + +struct MultiXactZeroPage { + slru_kind: SlruKind, + segno: u32, + rpageno: u32, +} + +enum RelmapRecord { + Update(RelmapUpdate), +} + +struct RelmapUpdate { + update: XlRelmapUpdate, + buf: Bytes, +} + +enum XlogRecord { + Raw(RawXlogRecord), +} + +struct RawXlogRecord { + info: u8, + lsn: Lsn, + buf: Bytes, +} + +enum LogicalMessageRecord { + Put(PutLogicalMessage), + #[cfg(feature = "testing")] + Failpoint, +} + +struct PutLogicalMessage { + path: String, + buf: Bytes, +} + +enum StandbyRecord { + RunningXacts(StandbyRunningXacts), +} + +struct StandbyRunningXacts { + oldest_running_xid: u32, +} + +enum ReploriginRecord { + Set(XlReploriginSet), + Drop(XlReploriginDrop), +} + impl WalIngest { pub async fn new( timeline: &Timeline, @@ -182,105 +319,58 @@ impl WalIngest { pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => { // Heap AM records need some special handling, because they modify VM pages // without registering them with the standard mechanism. - self.ingest_heapam_record(&mut buf, modification, &decoded, ctx) - .await?; + let maybe_heapam_record = + Self::decode_heapam_record(&mut buf, &decoded, pg_version)?; + if let Some(heapam_record) = maybe_heapam_record { + match heapam_record { + HeapamRecord::ClearVmBits(clear_vm_bits) => { + self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx) + .await?; + } + } + } } pg_constants::RM_NEON_ID => { - self.ingest_neonrmgr_record(&mut buf, modification, &decoded, ctx) - .await?; + let maybe_nenonrmgr_record = + Self::decode_neonmgr_record(&mut buf, &decoded, pg_version)?; + if let Some(neonrmgr_record) = maybe_nenonrmgr_record { + match neonrmgr_record { + NeonrmgrRecord::ClearVmBits(clear_vm_bits) => { + self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx) + .await?; + } + } + } } // Handle other special record types pg_constants::RM_SMGR_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_SMGR_CREATE { - let create = XlSmgrCreate::decode(&mut buf); - self.ingest_xlog_smgr_create(modification, &create, ctx) - .await?; - } else if info == pg_constants::XLOG_SMGR_TRUNCATE { - let truncate = XlSmgrTruncate::decode(&mut buf); - self.ingest_xlog_smgr_truncate(modification, &truncate, ctx) - .await?; + let maybe_smgr_record = + Self::decode_smgr_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(smgr_record) = maybe_smgr_record { + match smgr_record { + SmgrRecord::Create(create) => { + self.ingest_xlog_smgr_create(create, modification, ctx) + .await?; + } + SmgrRecord::Truncate(truncate) => { + self.ingest_xlog_smgr_truncate(truncate, modification, ctx) + .await?; + } + } } } pg_constants::RM_DBASE_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - debug!(%info, %pg_version, "handle RM_DBASE_ID"); + let maybe_dbase_record = + Self::decode_dbase_record(&mut buf, &decoded, pg_version).unwrap(); - if pg_version == 14 { - if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE { - let createdb = XlCreateDatabase::decode(&mut buf); - debug!("XLOG_DBASE_CREATE v14"); - - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) + if let Some(dbase_record) = maybe_dbase_record { + match dbase_record { + DbaseRecord::Create(create) => { + self.ingest_xlog_dbase_create(create, modification, ctx) .await?; } - } - } else if pg_version == 15 { - if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; - } - } - } else if pg_version == 16 { - if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; - } - } - } else if pg_version == 17 { - if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb, ctx) - .await?; - } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification - .drop_dbdir(tablespace_id, dropdb.db_id, ctx) - .await?; + DbaseRecord::Drop(drop) => { + self.ingest_xlog_dbase_drop(drop, modification, ctx).await?; } } } @@ -289,266 +379,113 @@ impl WalIngest { trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); } pg_constants::RM_CLOG_ID => { - let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; + // [`Self::decode_clog_record`] may never fail and always returns. + // It has this interface to match all the other decoding methods. + let clog_record = Self::decode_clog_record(&mut buf, &decoded, pg_version) + .unwrap() + .unwrap(); - if info == pg_constants::CLOG_ZEROPAGE { - let pageno = if pg_version < 17 { - buf.get_u32_le() - } else { - buf.get_u64_le() as u32 - }; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::Clog, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else { - assert!(info == pg_constants::CLOG_TRUNCATE); - let xlrec = XlClogTruncate::decode(&mut buf, pg_version); - self.ingest_clog_truncate_record(modification, &xlrec, ctx) - .await?; + match clog_record { + ClogRecord::ZeroPage(zero_page) => { + self.ingest_clog_zero_page(zero_page, modification, ctx) + .await?; + } + ClogRecord::Truncate(truncate) => { + self.ingest_clog_truncate(truncate, modification, ctx) + .await?; + } } } pg_constants::RM_XACT_ID => { - let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; - - if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT { - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record( - modification, - &parsed_xact, - info == pg_constants::XLOG_XACT_COMMIT, - decoded.origin_id, - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED - || info == pg_constants::XLOG_XACT_ABORT_PREPARED - { - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record( - modification, - &parsed_xact, - info == pg_constants::XLOG_XACT_COMMIT_PREPARED, - decoded.origin_id, - ctx, - ) - .await?; - // Remove twophase file. see RemoveTwoPhaseFile() in postgres code - trace!( - "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", - decoded.xl_xid, - parsed_xact.xid, - lsn, - ); - - let xid: u64 = if pg_version >= 17 { - self.adjust_to_full_transaction_id(parsed_xact.xid)? - } else { - parsed_xact.xid as u64 - }; - modification.drop_twophase_file(xid, ctx).await?; - } else if info == pg_constants::XLOG_XACT_PREPARE { - let xid: u64 = if pg_version >= 17 { - self.adjust_to_full_transaction_id(decoded.xl_xid)? - } else { - decoded.xl_xid as u64 - }; - modification - .put_twophase_file(xid, Bytes::copy_from_slice(&buf[..]), ctx) + let maybe_xact_record = + Self::decode_xact_record(&mut buf, &decoded, lsn, pg_version).unwrap(); + if let Some(xact_record) = maybe_xact_record { + self.ingest_xact_record(xact_record, modification, ctx) .await?; } } pg_constants::RM_MULTIXACT_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { - let pageno = if pg_version < 17 { - buf.get_u32_le() - } else { - buf.get_u64_le() as u32 - }; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::MultiXactOffsets, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { - let pageno = if pg_version < 17 { - buf.get_u32_le() - } else { - buf.get_u64_le() as u32 - }; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - self.put_slru_page_image( - modification, - SlruKind::MultiXactMembers, - segno, - rpageno, - ZERO_PAGE.clone(), - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - self.ingest_multixact_create_record(modification, &xlrec)?; - } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { - let xlrec = XlMultiXactTruncate::decode(&mut buf); - self.ingest_multixact_truncate_record(modification, &xlrec, ctx) - .await?; + let maybe_multixact_record = + Self::decode_multixact_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(multixact_record) = maybe_multixact_record { + match multixact_record { + MultiXactRecord::ZeroPage(zero_page) => { + self.ingest_multixact_zero_page(zero_page, modification, ctx) + .await?; + } + MultiXactRecord::Create(create) => { + self.ingest_multixact_create(modification, &create)?; + } + MultiXactRecord::Truncate(truncate) => { + self.ingest_multixact_truncate(modification, &truncate, ctx) + .await?; + } + } } } pg_constants::RM_RELMAP_ID => { - let xlrec = XlRelmapUpdate::decode(&mut buf); - self.ingest_relmap_page(modification, &xlrec, &decoded, ctx) - .await?; - } - pg_constants::RM_XLOG_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_PARAMETER_CHANGE { - if let CheckPoint::V17(cp) = &mut self.checkpoint { - let rec = v17::XlParameterChange::decode(&mut buf); - cp.wal_level = rec.wal_level; - self.checkpoint_modified = true; - } - } else if info == pg_constants::XLOG_END_OF_RECOVERY { - if let CheckPoint::V17(cp) = &mut self.checkpoint { - let rec = v17::XlEndOfRecovery::decode(&mut buf); - cp.wal_level = rec.wal_level; - self.checkpoint_modified = true; + let relmap_record = Self::decode_relmap_record(&mut buf, &decoded, pg_version) + .unwrap() + .unwrap(); + match relmap_record { + RelmapRecord::Update(update) => { + self.ingest_relmap_update(update, modification, ctx).await?; } } + } + // This is an odd duck. It needs to go to all shards. + // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY + // in WalIngest::new), we have to send the whole DecodedWalRecord::record to + // the pageserver and decode it there. + // + // Alternatively, one can make the checkpoint part of the subscription protocol + // to the pageserver. This should work fine, but can be done at a later point. + pg_constants::RM_XLOG_ID => { + let xlog_record = Self::decode_xlog_record(&mut buf, &decoded, lsn, pg_version) + .unwrap() + .unwrap(); - enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { - if info == pg_constants::XLOG_NEXTOID { - let next_oid = buf.get_u32_le(); - if cp.nextOid != next_oid { - cp.nextOid = next_oid; - self.checkpoint_modified = true; - } - } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE - || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN - { - let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT]; - buf.copy_to_slice(&mut checkpoint_bytes); - let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?; - trace!( - "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", - xlog_checkpoint.oldestXid, - cp.oldestXid - ); - if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 { - cp.oldestXid = xlog_checkpoint.oldestXid; - } - trace!( - "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}", - xlog_checkpoint.oldestActiveXid, - cp.oldestActiveXid - ); - - // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`, - // because at shutdown, all in-progress transactions will implicitly - // end. Postgres startup code knows that, and allows hot standby to start - // immediately from a shutdown checkpoint. - // - // In Neon, Postgres hot standby startup always behaves as if starting from - // an online checkpoint. It needs a valid `oldestActiveXid` value, so - // instead of overwriting self.checkpoint.oldestActiveXid with - // InvalidTransactionid from the checkpoint WAL record, update it to a - // proper value, knowing that there are no in-progress transactions at this - // point, except for prepared transactions. - // - // See also the neon code changes in the InitWalRecovery() function. - if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID - && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN - { - let oldest_active_xid = if pg_version >= 17 { - let mut oldest_active_full_xid = cp.nextXid.value; - for xid in modification.tline.list_twophase_files(lsn, ctx).await? { - if xid < oldest_active_full_xid { - oldest_active_full_xid = xid; - } - } - oldest_active_full_xid as u32 - } else { - let mut oldest_active_xid = cp.nextXid.value as u32; - for xid in modification.tline.list_twophase_files(lsn, ctx).await? { - let narrow_xid = xid as u32; - if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 { - oldest_active_xid = narrow_xid; - } - } - oldest_active_xid - }; - cp.oldestActiveXid = oldest_active_xid; - } else { - cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid; - } - - // Write a new checkpoint key-value pair on every checkpoint record, even - // if nothing really changed. Not strictly required, but it seems nice to - // have some trace of the checkpoint records in the layer files at the same - // LSNs. - self.checkpoint_modified = true; + match xlog_record { + XlogRecord::Raw(raw) => { + self.ingest_raw_xlog_record(raw, modification, ctx).await?; } - }); + } } pg_constants::RM_LOGICALMSG_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_LOGICAL_MESSAGE { - let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf); - let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; - let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size]; - if prefix == "neon-test" { - // This is a convenient way to make the WAL ingestion pause at - // particular point in the WAL. For more fine-grained control, - // we could peek into the message and only pause if it contains - // a particular string, for example, but this is enough for now. - failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep"); - } else if let Some(path) = prefix.strip_prefix("neon-file:") { - modification.put_file(path, message, ctx).await?; + let maybe_logical_message_record = + Self::decode_logical_message_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(logical_message_record) = maybe_logical_message_record { + match logical_message_record { + LogicalMessageRecord::Put(put) => { + self.ingest_logical_message_put(put, modification, ctx) + .await?; + } + #[cfg(feature = "testing")] + LogicalMessageRecord::Failpoint => { + // This is a convenient way to make the WAL ingestion pause at + // particular point in the WAL. For more fine-grained control, + // we could peek into the message and only pause if it contains + // a particular string, for example, but this is enough for now. + failpoint_support::sleep_millis_async!( + "pageserver-wal-ingest-logical-message-sleep" + ); + } } } } pg_constants::RM_STANDBY_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_RUNNING_XACTS { - let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf); - - enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { - cp.oldestActiveXid = xlrec.oldest_running_xid; - }); - - self.checkpoint_modified = true; + let maybe_standby_record = + Self::decode_standby_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(standby_record) = maybe_standby_record { + self.ingest_standby_record(standby_record).unwrap(); } } pg_constants::RM_REPLORIGIN_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_REPLORIGIN_SET { - let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf); - modification - .set_replorigin(xlrec.node_id, xlrec.remote_lsn) - .await? - } else if info == pg_constants::XLOG_REPLORIGIN_DROP { - let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf); - modification.drop_replorigin(xlrec.node_id).await? + let maybe_replorigin_record = + Self::decode_replorigin_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(replorigin_record) = maybe_replorigin_record { + self.ingest_replorigin_record(replorigin_record, modification) + .await?; } } _x => { @@ -709,13 +646,99 @@ impl WalIngest { Ok(()) } - async fn ingest_heapam_record( + async fn ingest_clear_vm_bits( &mut self, - buf: &mut Bytes, + clear_vm_bits: ClearVmBits, modification: &mut DatadirModification<'_>, - decoded: &DecodedWALRecord, ctx: &RequestContext, ) -> anyhow::Result<()> { + let ClearVmBits { + new_heap_blkno, + old_heap_blkno, + flags, + vm_rel, + } = clear_vm_bits; + // Clear the VM bits if required. + let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); + let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); + + // Sometimes, Postgres seems to create heap WAL records with the + // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is + // not set. In fact, it's possible that the VM page does not exist at all. + // In that case, we don't want to store a record to clear the VM bit; + // replaying it would fail to find the previous image of the page, because + // it doesn't exist. So check if the VM page(s) exist, and skip the WAL + // record if it doesn't. + let vm_size = get_relsize(modification, vm_rel, ctx).await?; + if let Some(blknum) = new_vm_blk { + if blknum >= vm_size { + new_vm_blk = None; + } + } + if let Some(blknum) = old_vm_blk { + if blknum >= vm_size { + old_vm_blk = None; + } + } + + if new_vm_blk.is_some() || old_vm_blk.is_some() { + if new_vm_blk == old_vm_blk { + // An UPDATE record that needs to clear the bits for both old and the + // new page, both of which reside on the same VM page. + self.put_rel_wal_record( + modification, + vm_rel, + new_vm_blk.unwrap(), + NeonWalRecord::ClearVisibilityMapFlags { + new_heap_blkno, + old_heap_blkno, + flags, + }, + ctx, + ) + .await?; + } else { + // Clear VM bits for one heap page, or for two pages that reside on + // different VM pages. + if let Some(new_vm_blk) = new_vm_blk { + self.put_rel_wal_record( + modification, + vm_rel, + new_vm_blk, + NeonWalRecord::ClearVisibilityMapFlags { + new_heap_blkno, + old_heap_blkno: None, + flags, + }, + ctx, + ) + .await?; + } + if let Some(old_vm_blk) = old_vm_blk { + self.put_rel_wal_record( + modification, + vm_rel, + old_vm_blk, + NeonWalRecord::ClearVisibilityMapFlags { + new_heap_blkno: None, + old_heap_blkno, + flags, + }, + ctx, + ) + .await?; + } + } + } + + Ok(()) + } + + fn decode_heapam_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { // Handle VM bit updates that are implicitly part of heap records. // First, look at the record to determine which VM bits need @@ -725,7 +748,7 @@ impl WalIngest { let mut old_heap_blkno: Option = None; let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; - match modification.tline.pg_version { + match pg_version { 14 => { if decoded.xl_rmid == pg_constants::RM_HEAP_ID { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; @@ -997,7 +1020,6 @@ impl WalIngest { _ => {} } - // Clear the VM bits if required. if new_heap_blkno.is_some() || old_heap_blkno.is_some() { let vm_rel = RelTag { forknum: VISIBILITYMAP_FORKNUM, @@ -1006,89 +1028,22 @@ impl WalIngest { relnode: decoded.blocks[0].rnode_relnode, }; - let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); - let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); - - // Sometimes, Postgres seems to create heap WAL records with the - // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is - // not set. In fact, it's possible that the VM page does not exist at all. - // In that case, we don't want to store a record to clear the VM bit; - // replaying it would fail to find the previous image of the page, because - // it doesn't exist. So check if the VM page(s) exist, and skip the WAL - // record if it doesn't. - let vm_size = get_relsize(modification, vm_rel, ctx).await?; - if let Some(blknum) = new_vm_blk { - if blknum >= vm_size { - new_vm_blk = None; - } - } - if let Some(blknum) = old_vm_blk { - if blknum >= vm_size { - old_vm_blk = None; - } - } - - if new_vm_blk.is_some() || old_vm_blk.is_some() { - if new_vm_blk == old_vm_blk { - // An UPDATE record that needs to clear the bits for both old and the - // new page, both of which reside on the same VM page. - self.put_rel_wal_record( - modification, - vm_rel, - new_vm_blk.unwrap(), - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno, - old_heap_blkno, - flags, - }, - ctx, - ) - .await?; - } else { - // Clear VM bits for one heap page, or for two pages that reside on - // different VM pages. - if let Some(new_vm_blk) = new_vm_blk { - self.put_rel_wal_record( - modification, - vm_rel, - new_vm_blk, - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno, - old_heap_blkno: None, - flags, - }, - ctx, - ) - .await?; - } - if let Some(old_vm_blk) = old_vm_blk { - self.put_rel_wal_record( - modification, - vm_rel, - old_vm_blk, - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno: None, - old_heap_blkno, - flags, - }, - ctx, - ) - .await?; - } - } - } + Ok(Some(HeapamRecord::ClearVmBits(ClearVmBits { + new_heap_blkno, + old_heap_blkno, + vm_rel, + flags, + }))) + } else { + Ok(None) } - - Ok(()) } - async fn ingest_neonrmgr_record( - &mut self, + fn decode_neonmgr_record( buf: &mut Bytes, - modification: &mut DatadirModification<'_>, decoded: &DecodedWALRecord, - ctx: &RequestContext, - ) -> anyhow::Result<()> { + pg_version: u32, + ) -> anyhow::Result> { // Handle VM bit updates that are implicitly part of heap records. // First, look at the record to determine which VM bits need @@ -1097,7 +1052,6 @@ impl WalIngest { let mut new_heap_blkno: Option = None; let mut old_heap_blkno: Option = None; let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; - let pg_version = modification.tline.pg_version; assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID); @@ -1168,7 +1122,6 @@ impl WalIngest { ), } - // Clear the VM bits if required. if new_heap_blkno.is_some() || old_heap_blkno.is_some() { let vm_rel = RelTag { forknum: VISIBILITYMAP_FORKNUM, @@ -1177,93 +1130,30 @@ impl WalIngest { relnode: decoded.blocks[0].rnode_relnode, }; - let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); - let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK); - - // Sometimes, Postgres seems to create heap WAL records with the - // ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is - // not set. In fact, it's possible that the VM page does not exist at all. - // In that case, we don't want to store a record to clear the VM bit; - // replaying it would fail to find the previous image of the page, because - // it doesn't exist. So check if the VM page(s) exist, and skip the WAL - // record if it doesn't. - let vm_size = get_relsize(modification, vm_rel, ctx).await?; - if let Some(blknum) = new_vm_blk { - if blknum >= vm_size { - new_vm_blk = None; - } - } - if let Some(blknum) = old_vm_blk { - if blknum >= vm_size { - old_vm_blk = None; - } - } - - if new_vm_blk.is_some() || old_vm_blk.is_some() { - if new_vm_blk == old_vm_blk { - // An UPDATE record that needs to clear the bits for both old and the - // new page, both of which reside on the same VM page. - self.put_rel_wal_record( - modification, - vm_rel, - new_vm_blk.unwrap(), - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno, - old_heap_blkno, - flags, - }, - ctx, - ) - .await?; - } else { - // Clear VM bits for one heap page, or for two pages that reside on - // different VM pages. - if let Some(new_vm_blk) = new_vm_blk { - self.put_rel_wal_record( - modification, - vm_rel, - new_vm_blk, - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno, - old_heap_blkno: None, - flags, - }, - ctx, - ) - .await?; - } - if let Some(old_vm_blk) = old_vm_blk { - self.put_rel_wal_record( - modification, - vm_rel, - old_vm_blk, - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno: None, - old_heap_blkno, - flags, - }, - ctx, - ) - .await?; - } - } - } + Ok(Some(NeonrmgrRecord::ClearVmBits(ClearVmBits { + new_heap_blkno, + old_heap_blkno, + vm_rel, + flags, + }))) + } else { + Ok(None) } - - Ok(()) } /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. async fn ingest_xlog_dbase_create( &mut self, + create: DbaseCreate, modification: &mut DatadirModification<'_>, - rec: &XlCreateDatabase, ctx: &RequestContext, ) -> anyhow::Result<()> { - let db_id = rec.db_id; - let tablespace_id = rec.tablespace_id; - let src_db_id = rec.src_db_id; - let src_tablespace_id = rec.src_tablespace_id; + let DbaseCreate { + db_id, + tablespace_id, + src_db_id, + src_tablespace_id, + } = create; let rels = modification .tline @@ -1349,46 +1239,209 @@ impl WalIngest { Ok(()) } - async fn ingest_xlog_smgr_create( + async fn ingest_xlog_dbase_drop( &mut self, + dbase_drop: DbaseDrop, modification: &mut DatadirModification<'_>, - rec: &XlSmgrCreate, ctx: &RequestContext, ) -> anyhow::Result<()> { - let rel = RelTag { - spcnode: rec.rnode.spcnode, - dbnode: rec.rnode.dbnode, - relnode: rec.rnode.relnode, - forknum: rec.forknum, - }; + let DbaseDrop { + db_id, + tablespace_ids, + } = dbase_drop; + for tablespace_id in tablespace_ids { + trace!("Drop db {}, {}", tablespace_id, db_id); + modification.drop_dbdir(tablespace_id, db_id, ctx).await?; + } + + Ok(()) + } + + fn decode_dbase_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + // TODO: Refactor this to avoid the duplication between postgres versions. + + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + debug!(%info, %pg_version, "handle RM_DBASE_ID"); + + if pg_version == 14 { + if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE { + let createdb = XlCreateDatabase::decode(buf); + debug!("XLOG_DBASE_CREATE v14"); + + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } else if pg_version == 15 { + if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } else if pg_version == 16 { + if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } else if pg_version == 17 { + if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + }); + + return Ok(Some(record)); + } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + }); + + return Ok(Some(record)); + } + } + + Ok(None) + } + + async fn ingest_xlog_smgr_create( + &mut self, + create: SmgrCreate, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let SmgrCreate { rel } = create; self.put_rel_creation(modification, rel, ctx).await?; Ok(()) } + fn decode_smgr_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_SMGR_CREATE { + let create = XlSmgrCreate::decode(buf); + let rel = RelTag { + spcnode: create.rnode.spcnode, + dbnode: create.rnode.dbnode, + relnode: create.rnode.relnode, + forknum: create.forknum, + }; + + return Ok(Some(SmgrRecord::Create(SmgrCreate { rel }))); + } else if info == pg_constants::XLOG_SMGR_TRUNCATE { + let truncate = XlSmgrTruncate::decode(buf); + return Ok(Some(SmgrRecord::Truncate(truncate))); + } + + Ok(None) + } + /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record. /// /// This is the same logic as in PostgreSQL's smgr_redo() function. async fn ingest_xlog_smgr_truncate( &mut self, + truncate: XlSmgrTruncate, modification: &mut DatadirModification<'_>, - rec: &XlSmgrTruncate, ctx: &RequestContext, ) -> anyhow::Result<()> { - let spcnode = rec.rnode.spcnode; - let dbnode = rec.rnode.dbnode; - let relnode = rec.rnode.relnode; + let XlSmgrTruncate { + blkno, + rnode, + flags, + } = truncate; - if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 { + let spcnode = rnode.spcnode; + let dbnode = rnode.dbnode; + let relnode = rnode.relnode; + + if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 { let rel = RelTag { spcnode, dbnode, relnode, forknum: MAIN_FORKNUM, }; - self.put_rel_truncation(modification, rel, rec.blkno, ctx) + + self.put_rel_truncation(modification, rel, blkno, ctx) .await?; } - if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 { + if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 { let rel = RelTag { spcnode, dbnode, @@ -1396,9 +1449,9 @@ impl WalIngest { forknum: FSM_FORKNUM, }; - let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE; + let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE; let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no); - if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 { + if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 { // Tail of last remaining FSM page has to be zeroed. // We are not precise here and instead of digging in FSM bitmap format just clear the whole page. modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?; @@ -1411,7 +1464,7 @@ impl WalIngest { .await?; } } - if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 { + if flags & pg_constants::SMGR_TRUNCATE_VM != 0 { let rel = RelTag { spcnode, dbnode, @@ -1419,8 +1472,8 @@ impl WalIngest { forknum: VISIBILITYMAP_FORKNUM, }; - let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE; - if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 { + let mut vm_page_no = blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE; + if blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 { // Tail of last remaining vm page has to be zeroed. // We are not precise here and instead of digging in VM bitmap format just clear the whole page. modification.put_rel_page_image_zero(rel, vm_page_no)?; @@ -1493,12 +1546,32 @@ impl WalIngest { /// async fn ingest_xact_record( &mut self, + record: XactRecord, modification: &mut DatadirModification<'_>, - parsed: &XlXactParsedRecord, - is_commit: bool, - origin_id: u16, ctx: &RequestContext, ) -> anyhow::Result<()> { + let (xact_common, is_commit, is_prepared) = match record { + XactRecord::Prepare(XactPrepare { xl_xid, data }) => { + let xid: u64 = if modification.tline.pg_version >= 17 { + self.adjust_to_full_transaction_id(xl_xid)? + } else { + xl_xid as u64 + }; + return modification.put_twophase_file(xid, data, ctx).await; + } + XactRecord::Commit(common) => (common, true, false), + XactRecord::Abort(common) => (common, false, false), + XactRecord::CommitPrepared(common) => (common, true, true), + XactRecord::AbortPrepared(common) => (common, false, true), + }; + + let XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + } = xact_common; + // Record update of CLOG pages let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; @@ -1569,18 +1642,95 @@ impl WalIngest { .set_replorigin(origin_id, parsed.origin_lsn) .await?; } + + if is_prepared { + // Remove twophase file. see RemoveTwoPhaseFile() in postgres code + trace!( + "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", + xl_xid, + parsed.xid, + lsn, + ); + + let xid: u64 = if modification.tline.pg_version >= 17 { + self.adjust_to_full_transaction_id(parsed.xid)? + } else { + parsed.xid as u64 + }; + modification.drop_twophase_file(xid, ctx).await?; + } + Ok(()) } - async fn ingest_clog_truncate_record( + // TODO(vlad): Standardise interface for `decode_...` + fn decode_xact_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + lsn: Lsn, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; + let origin_id = decoded.origin_id; + let xl_xid = decoded.xl_xid; + + if info == pg_constants::XLOG_XACT_COMMIT { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::Commit(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_ABORT { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::Abort(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::CommitPrepared(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::AbortPrepared(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_PREPARE { + return Ok(Some(XactRecord::Prepare(XactPrepare { + xl_xid: decoded.xl_xid, + data: Bytes::copy_from_slice(&buf[..]), + }))); + } + + Ok(None) + } + + async fn ingest_clog_truncate( &mut self, + truncate: ClogTruncate, modification: &mut DatadirModification<'_>, - xlrec: &XlClogTruncate, ctx: &RequestContext, ) -> anyhow::Result<()> { + let ClogTruncate { + pageno, + oldest_xid, + oldest_xid_db, + } = truncate; + info!( "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}", - xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db + pageno, oldest_xid, oldest_xid_db ); // In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is @@ -1588,8 +1738,8 @@ impl WalIngest { // later. In Neon, a server can start at any LSN, not just on a checkpoint record, // so we keep the oldestXid and oldestXidDB up-to-date. enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { - cp.oldestXid = xlrec.oldest_xid; - cp.oldestXidDB = xlrec.oldest_xid_db; + cp.oldestXid = oldest_xid; + cp.oldestXidDB = oldest_xid_db; }); self.checkpoint_modified = true; @@ -1606,7 +1756,7 @@ impl WalIngest { // the current endpoint page must not be eligible for removal. // See SimpleLruTruncate() in slru.c if dispatch_pgversion!(modification.tline.pg_version, { - pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, xlrec.pageno) + pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno) }) { info!("could not truncate directory pg_xact apparent wraparound"); return Ok(()); @@ -1626,7 +1776,7 @@ impl WalIngest { let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; let may_delete = dispatch_pgversion!(modification.tline.pg_version, { - pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, xlrec.pageno) + pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno) }); if may_delete { @@ -1640,7 +1790,55 @@ impl WalIngest { Ok(()) } - fn ingest_multixact_create_record( + async fn ingest_clog_zero_page( + &mut self, + zero_page: ClogZeroPage, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let ClogZeroPage { segno, rpageno } = zero_page; + + self.put_slru_page_image( + modification, + SlruKind::Clog, + segno, + rpageno, + ZERO_PAGE.clone(), + ctx, + ) + .await + } + + fn decode_clog_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; + + if info == pg_constants::CLOG_ZEROPAGE { + let pageno = if pg_version < 17 { + buf.get_u32_le() + } else { + buf.get_u64_le() as u32 + }; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + Ok(Some(ClogRecord::ZeroPage(ClogZeroPage { segno, rpageno }))) + } else { + assert!(info == pg_constants::CLOG_TRUNCATE); + let xlrec = XlClogTruncate::decode(buf, pg_version); + + Ok(Some(ClogRecord::Truncate(ClogTruncate { + pageno: xlrec.pageno, + oldest_xid: xlrec.oldest_xid, + oldest_xid_db: xlrec.oldest_xid_db, + }))) + } + } + + fn ingest_multixact_create( &mut self, modification: &mut DatadirModification, xlrec: &XlMultiXactCreate, @@ -1742,7 +1940,7 @@ impl WalIngest { Ok(()) } - async fn ingest_multixact_truncate_record( + async fn ingest_multixact_truncate( &mut self, modification: &mut DatadirModification<'_>, xlrec: &XlMultiXactTruncate, @@ -1788,26 +1986,315 @@ impl WalIngest { Ok(()) } - async fn ingest_relmap_page( + async fn ingest_multixact_zero_page( &mut self, + zero_page: MultiXactZeroPage, modification: &mut DatadirModification<'_>, - xlrec: &XlRelmapUpdate, - decoded: &DecodedWALRecord, ctx: &RequestContext, ) -> Result<()> { + let MultiXactZeroPage { + slru_kind, + segno, + rpageno, + } = zero_page; + self.put_slru_page_image( + modification, + slru_kind, + segno, + rpageno, + ZERO_PAGE.clone(), + ctx, + ) + .await + } + + fn decode_multixact_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE + || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE + { + let pageno = if pg_version < 17 { + buf.get_u32_le() + } else { + buf.get_u64_le() as u32 + }; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + let slru_kind = match info { + pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets, + pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers, + _ => unreachable!(), + }; + + return Ok(Some(MultiXactRecord::ZeroPage(MultiXactZeroPage { + slru_kind, + segno, + rpageno, + }))); + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(buf); + return Ok(Some(MultiXactRecord::Create(xlrec))); + } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { + let xlrec = XlMultiXactTruncate::decode(buf); + return Ok(Some(MultiXactRecord::Truncate(xlrec))); + } + + Ok(None) + } + + async fn ingest_relmap_update( + &mut self, + update: RelmapUpdate, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> Result<()> { + let RelmapUpdate { update, buf } = update; + + modification + .put_relmap_file(update.tsid, update.dbid, buf, ctx) + .await + } + + fn decode_relmap_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let update = XlRelmapUpdate::decode(buf); + let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); // skip xl_relmap_update buf.advance(12); - modification - .put_relmap_file( - xlrec.tsid, - xlrec.dbid, - Bytes::copy_from_slice(&buf[..]), - ctx, - ) - .await + Ok(Some(RelmapRecord::Update(RelmapUpdate { + update, + buf: Bytes::copy_from_slice(&buf[..]), + }))) + } + + async fn ingest_raw_xlog_record( + &mut self, + raw_record: RawXlogRecord, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> Result<()> { + let RawXlogRecord { info, lsn, mut buf } = raw_record; + let pg_version = modification.tline.pg_version; + + if info == pg_constants::XLOG_PARAMETER_CHANGE { + if let CheckPoint::V17(cp) = &mut self.checkpoint { + let rec = v17::XlParameterChange::decode(&mut buf); + cp.wal_level = rec.wal_level; + self.checkpoint_modified = true; + } + } else if info == pg_constants::XLOG_END_OF_RECOVERY { + if let CheckPoint::V17(cp) = &mut self.checkpoint { + let rec = v17::XlEndOfRecovery::decode(&mut buf); + cp.wal_level = rec.wal_level; + self.checkpoint_modified = true; + } + } + + enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { + if info == pg_constants::XLOG_NEXTOID { + let next_oid = buf.get_u32_le(); + if cp.nextOid != next_oid { + cp.nextOid = next_oid; + self.checkpoint_modified = true; + } + } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE + || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT]; + buf.copy_to_slice(&mut checkpoint_bytes); + let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?; + trace!( + "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", + xlog_checkpoint.oldestXid, + cp.oldestXid + ); + if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 { + cp.oldestXid = xlog_checkpoint.oldestXid; + } + trace!( + "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}", + xlog_checkpoint.oldestActiveXid, + cp.oldestActiveXid + ); + + // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`, + // because at shutdown, all in-progress transactions will implicitly + // end. Postgres startup code knows that, and allows hot standby to start + // immediately from a shutdown checkpoint. + // + // In Neon, Postgres hot standby startup always behaves as if starting from + // an online checkpoint. It needs a valid `oldestActiveXid` value, so + // instead of overwriting self.checkpoint.oldestActiveXid with + // InvalidTransactionid from the checkpoint WAL record, update it to a + // proper value, knowing that there are no in-progress transactions at this + // point, except for prepared transactions. + // + // See also the neon code changes in the InitWalRecovery() function. + if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID + && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let oldest_active_xid = if pg_version >= 17 { + let mut oldest_active_full_xid = cp.nextXid.value; + for xid in modification.tline.list_twophase_files(lsn, ctx).await? { + if xid < oldest_active_full_xid { + oldest_active_full_xid = xid; + } + } + oldest_active_full_xid as u32 + } else { + let mut oldest_active_xid = cp.nextXid.value as u32; + for xid in modification.tline.list_twophase_files(lsn, ctx).await? { + let narrow_xid = xid as u32; + if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 { + oldest_active_xid = narrow_xid; + } + } + oldest_active_xid + }; + cp.oldestActiveXid = oldest_active_xid; + } else { + cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid; + } + + // Write a new checkpoint key-value pair on every checkpoint record, even + // if nothing really changed. Not strictly required, but it seems nice to + // have some trace of the checkpoint records in the layer files at the same + // LSNs. + self.checkpoint_modified = true; + } + }); + + Ok(()) + } + + fn decode_xlog_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + lsn: Lsn, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + Ok(Some(XlogRecord::Raw(RawXlogRecord { + info, + lsn, + buf: buf.clone(), + }))) + } + + async fn ingest_logical_message_put( + &mut self, + put: PutLogicalMessage, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> Result<()> { + let PutLogicalMessage { path, buf } = put; + modification.put_file(path.as_str(), &buf, ctx).await + } + + fn decode_logical_message_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_LOGICAL_MESSAGE { + let xlrec = crate::walrecord::XlLogicalMessage::decode(buf); + let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; + + #[cfg(feature = "testing")] + if prefix == "neon-test" { + return Ok(Some(LogicalMessageRecord::Failpoint)); + } + + if let Some(path) = prefix.strip_prefix("neon-file:") { + let buf_size = xlrec.prefix_size + xlrec.message_size; + let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]); + return Ok(Some(LogicalMessageRecord::Put(PutLogicalMessage { + path: path.to_string(), + buf, + }))); + } + } + + Ok(None) + } + + fn decode_standby_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_RUNNING_XACTS { + let xlrec = crate::walrecord::XlRunningXacts::decode(buf); + return Ok(Some(StandbyRecord::RunningXacts(StandbyRunningXacts { + oldest_running_xid: xlrec.oldest_running_xid, + }))); + } + + Ok(None) + } + + fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> { + match record { + StandbyRecord::RunningXacts(running_xacts) => { + enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { + cp.oldestActiveXid = running_xacts.oldest_running_xid; + }); + + self.checkpoint_modified = true; + } + } + + Ok(()) + } + + fn decode_replorigin_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_REPLORIGIN_SET { + let xlrec = crate::walrecord::XlReploriginSet::decode(buf); + return Ok(Some(ReploriginRecord::Set(xlrec))); + } else if info == pg_constants::XLOG_REPLORIGIN_DROP { + let xlrec = crate::walrecord::XlReploriginDrop::decode(buf); + return Ok(Some(ReploriginRecord::Drop(xlrec))); + } + + Ok(None) + } + + async fn ingest_replorigin_record( + &mut self, + record: ReploriginRecord, + modification: &mut DatadirModification<'_>, + ) -> Result<()> { + match record { + ReploriginRecord::Set(set) => { + modification + .set_replorigin(set.node_id, set.remote_lsn) + .await?; + } + ReploriginRecord::Drop(drop) => { + modification.drop_replorigin(drop.node_id).await?; + } + } + + Ok(()) } async fn put_rel_creation( diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index 5561a128b7..fc9adb14c9 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -435,7 +435,9 @@ def test_emergency_relocate_with_branches_slow_replay( # This fail point will pause the WAL ingestion on the main branch, after the # the first insert - pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")]) + pageserver_http.configure_failpoints( + [("pageserver-wal-ingest-logical-message-sleep", "return(5000)")] + ) # Attach and wait a few seconds to give it time to load the tenants, attach to the # safekeepers, and to stream and ingest the WAL up to the pause-point. @@ -453,11 +455,13 @@ def test_emergency_relocate_with_branches_slow_replay( assert cur.fetchall() == [("before pause",), ("after pause",)] # Sanity check that the failpoint was reached - env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + env.pageserver.assert_log_contains( + 'failpoint "pageserver-wal-ingest-logical-message-sleep": sleep done' + ) assert time.time() - before_attach_time > 5 # Clean up - pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off")) + pageserver_http.configure_failpoints(("pageserver-wal-ingest-logical-message-sleep", "off")) # Simulate hard crash of pageserver and re-attach a tenant with a branch @@ -581,7 +585,9 @@ def test_emergency_relocate_with_branches_createdb( # bug reproduced easily even without this, as there is always some delay between # loading the timeline and establishing the connection to the safekeeper to stream and # ingest the WAL, but let's make this less dependent on accidental timing. - pageserver_http.configure_failpoints([("wal-ingest-logical-message-sleep", "return(5000)")]) + pageserver_http.configure_failpoints( + [("pageserver-wal-ingest-logical-message-sleep", "return(5000)")] + ) before_attach_time = time.time() env.pageserver.tenant_attach(tenant_id) @@ -590,8 +596,10 @@ def test_emergency_relocate_with_branches_createdb( assert query_scalar(cur, "SELECT count(*) FROM test_migrate_one") == 200 # Sanity check that the failpoint was reached - env.pageserver.assert_log_contains('failpoint "wal-ingest-logical-message-sleep": sleep done') + env.pageserver.assert_log_contains( + 'failpoint "pageserver-wal-ingest-logical-message-sleep": sleep done' + ) assert time.time() - before_attach_time > 5 # Clean up - pageserver_http.configure_failpoints(("wal-ingest-logical-message-sleep", "off")) + pageserver_http.configure_failpoints(("pageserver-wal-ingest-logical-message-sleep", "off"))