diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 90c31aea93..e94fba38c6 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -168,6 +168,27 @@ struct ClogTruncate { oldest_xid_db: u32, } +enum XactRecord { + Commit(XactCommon), + Abort(XactCommon), + CommitPrepared(XactCommon), + AbortPrepared(XactCommon), + Prepare(XactPrepare), +} + +struct XactCommon { + parsed: XlXactParsedRecord, + origin_id: u16, + // Fields below are only used for logging + xl_xid: u32, + lsn: Lsn, +} + +struct XactPrepare { + xl_xid: u32, + data: Bytes, +} + impl WalIngest { pub async fn new( timeline: &Timeline, @@ -313,54 +334,10 @@ impl WalIngest { } } pg_constants::RM_XACT_ID => { - let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; - - if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT { - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record( - modification, - &parsed_xact, - info == pg_constants::XLOG_XACT_COMMIT, - decoded.origin_id, - ctx, - ) - .await?; - } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED - || info == pg_constants::XLOG_XACT_ABORT_PREPARED - { - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record( - modification, - &parsed_xact, - info == pg_constants::XLOG_XACT_COMMIT_PREPARED, - decoded.origin_id, - ctx, - ) - .await?; - // Remove twophase file. see RemoveTwoPhaseFile() in postgres code - trace!( - "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", - decoded.xl_xid, - parsed_xact.xid, - lsn, - ); - - let xid: u64 = if pg_version >= 17 { - self.adjust_to_full_transaction_id(parsed_xact.xid)? - } else { - parsed_xact.xid as u64 - }; - modification.drop_twophase_file(xid, ctx).await?; - } else if info == pg_constants::XLOG_XACT_PREPARE { - let xid: u64 = if pg_version >= 17 { - self.adjust_to_full_transaction_id(decoded.xl_xid)? - } else { - decoded.xl_xid as u64 - }; - modification - .put_twophase_file(xid, Bytes::copy_from_slice(&buf[..]), ctx) + let maybe_xact_record = + Self::decode_xact_record(&mut buf, &decoded, lsn, pg_version).unwrap(); + if let Some(xact_record) = maybe_xact_record { + self.ingest_xact_record(xact_record, modification, ctx) .await?; } } @@ -1709,12 +1686,32 @@ impl WalIngest { /// async fn ingest_xact_record( &mut self, + record: XactRecord, modification: &mut DatadirModification<'_>, - parsed: &XlXactParsedRecord, - is_commit: bool, - origin_id: u16, ctx: &RequestContext, ) -> anyhow::Result<()> { + let (xact_common, is_commit, is_prepared) = match record { + XactRecord::Prepare(XactPrepare { xl_xid, data }) => { + let xid: u64 = if modification.tline.pg_version >= 17 { + self.adjust_to_full_transaction_id(xl_xid)? + } else { + xl_xid as u64 + }; + return modification.put_twophase_file(xid, data, ctx).await; + } + XactRecord::Commit(common) => (common, true, false), + XactRecord::Abort(common) => (common, false, false), + XactRecord::CommitPrepared(common) => (common, true, true), + XactRecord::AbortPrepared(common) => (common, false, true), + }; + + let XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + } = xact_common; + // Record update of CLOG pages let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; @@ -1785,9 +1782,80 @@ impl WalIngest { .set_replorigin(origin_id, parsed.origin_lsn) .await?; } + + if is_prepared { + // Remove twophase file. see RemoveTwoPhaseFile() in postgres code + trace!( + "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", + xl_xid, + parsed.xid, + lsn, + ); + + let xid: u64 = if modification.tline.pg_version >= 17 { + self.adjust_to_full_transaction_id(parsed.xid)? + } else { + parsed.xid as u64 + }; + modification.drop_twophase_file(xid, ctx).await?; + } + Ok(()) } + // TODO(vlad): Standardise interface for `decode_...` + fn decode_xact_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + lsn: Lsn, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; + let origin_id = decoded.origin_id; + let xl_xid = decoded.xl_xid; + + if info == pg_constants::XLOG_XACT_COMMIT { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::Commit(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_ABORT { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::Abort(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::CommitPrepared(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(XactRecord::AbortPrepared(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }))); + } else if info == pg_constants::XLOG_XACT_PREPARE { + return Ok(Some(XactRecord::Prepare(XactPrepare { + xl_xid: decoded.xl_xid, + data: Bytes::copy_from_slice(&buf[..]), + }))); + } + + Ok(None) + } + async fn ingest_clog_truncate_record( &mut self, truncate: ClogTruncate,