pageserver: refactor xact records

This one is a bit less obvious than the previous ones.

I merged some of the logic that was previously in
`WalIngest::ingest_record` to `WalIngest::ingest_xact_record`.
This commit is contained in:
Vlad Lazar
2024-10-16 20:32:05 +02:00
parent 6fdba1a427
commit 813bd0213d

View File

@@ -168,6 +168,27 @@ struct ClogTruncate {
oldest_xid_db: u32,
}
enum XactRecord {
Commit(XactCommon),
Abort(XactCommon),
CommitPrepared(XactCommon),
AbortPrepared(XactCommon),
Prepare(XactPrepare),
}
struct XactCommon {
parsed: XlXactParsedRecord,
origin_id: u16,
// Fields below are only used for logging
xl_xid: u32,
lsn: Lsn,
}
struct XactPrepare {
xl_xid: u32,
data: Bytes,
}
impl WalIngest {
pub async fn new(
timeline: &Timeline,
@@ -313,54 +334,10 @@ impl WalIngest {
}
}
pg_constants::RM_XACT_ID => {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
decoded.origin_id,
ctx,
)
.await?;
} else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
|| info == pg_constants::XLOG_XACT_ABORT_PREPARED
{
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
decoded.origin_id,
ctx,
)
.await?;
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
trace!(
"Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
decoded.xl_xid,
parsed_xact.xid,
lsn,
);
let xid: u64 = if pg_version >= 17 {
self.adjust_to_full_transaction_id(parsed_xact.xid)?
} else {
parsed_xact.xid as u64
};
modification.drop_twophase_file(xid, ctx).await?;
} else if info == pg_constants::XLOG_XACT_PREPARE {
let xid: u64 = if pg_version >= 17 {
self.adjust_to_full_transaction_id(decoded.xl_xid)?
} else {
decoded.xl_xid as u64
};
modification
.put_twophase_file(xid, Bytes::copy_from_slice(&buf[..]), ctx)
let maybe_xact_record =
Self::decode_xact_record(&mut buf, &decoded, lsn, pg_version).unwrap();
if let Some(xact_record) = maybe_xact_record {
self.ingest_xact_record(xact_record, modification, ctx)
.await?;
}
}
@@ -1709,12 +1686,32 @@ impl WalIngest {
///
async fn ingest_xact_record(
&mut self,
record: XactRecord,
modification: &mut DatadirModification<'_>,
parsed: &XlXactParsedRecord,
is_commit: bool,
origin_id: u16,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let (xact_common, is_commit, is_prepared) = match record {
XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
let xid: u64 = if modification.tline.pg_version >= 17 {
self.adjust_to_full_transaction_id(xl_xid)?
} else {
xl_xid as u64
};
return modification.put_twophase_file(xid, data, ctx).await;
}
XactRecord::Commit(common) => (common, true, false),
XactRecord::Abort(common) => (common, false, false),
XactRecord::CommitPrepared(common) => (common, true, true),
XactRecord::AbortPrepared(common) => (common, false, true),
};
let XactCommon {
parsed,
origin_id,
xl_xid,
lsn,
} = xact_common;
// Record update of CLOG pages
let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
@@ -1785,9 +1782,80 @@ impl WalIngest {
.set_replorigin(origin_id, parsed.origin_lsn)
.await?;
}
if is_prepared {
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
trace!(
"Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
xl_xid,
parsed.xid,
lsn,
);
let xid: u64 = if modification.tline.pg_version >= 17 {
self.adjust_to_full_transaction_id(parsed.xid)?
} else {
parsed.xid as u64
};
modification.drop_twophase_file(xid, ctx).await?;
}
Ok(())
}
// TODO(vlad): Standardise interface for `decode_...`
fn decode_xact_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
lsn: Lsn,
_pg_version: u32,
) -> anyhow::Result<Option<XactRecord>> {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
let origin_id = decoded.origin_id;
let xl_xid = decoded.xl_xid;
if info == pg_constants::XLOG_XACT_COMMIT {
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
return Ok(Some(XactRecord::Commit(XactCommon {
parsed,
origin_id,
xl_xid,
lsn,
})));
} else if info == pg_constants::XLOG_XACT_ABORT {
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
return Ok(Some(XactRecord::Abort(XactCommon {
parsed,
origin_id,
xl_xid,
lsn,
})));
} else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
return Ok(Some(XactRecord::CommitPrepared(XactCommon {
parsed,
origin_id,
xl_xid,
lsn,
})));
} else if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info);
return Ok(Some(XactRecord::AbortPrepared(XactCommon {
parsed,
origin_id,
xl_xid,
lsn,
})));
} else if info == pg_constants::XLOG_XACT_PREPARE {
return Ok(Some(XactRecord::Prepare(XactPrepare {
xl_xid: decoded.xl_xid,
data: Bytes::copy_from_slice(&buf[..]),
})));
}
Ok(None)
}
async fn ingest_clog_truncate_record(
&mut self,
truncate: ClogTruncate,