pageserver: refactor replorigin record

This commit is contained in:
Vlad Lazar
2024-10-19 14:36:58 +02:00
parent d37d6f5fd0
commit 345ae45faa

View File

@@ -225,6 +225,11 @@ struct StandbyRunningXacts {
oldest_running_xid: u32,
}
enum ReploriginRecord {
Set(XlReploriginSet),
Drop(XlReploriginDrop),
}
impl WalIngest {
pub async fn new(
timeline: &Timeline,
@@ -433,15 +438,11 @@ impl WalIngest {
}
}
pg_constants::RM_REPLORIGIN_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_REPLORIGIN_SET {
let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf);
modification
.set_replorigin(xlrec.node_id, xlrec.remote_lsn)
.await?
} else if info == pg_constants::XLOG_REPLORIGIN_DROP {
let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf);
modification.drop_replorigin(xlrec.node_id).await?
let maybe_replorigin_record =
Self::decode_replorigin_record(&mut buf, &decoded, pg_version).unwrap();
if let Some(replorigin_record) = maybe_replorigin_record {
self.ingest_replorigin_record(replorigin_record, modification)
.await?;
}
}
_x => {
@@ -2325,6 +2326,42 @@ impl WalIngest {
Ok(())
}
fn decode_replorigin_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
_pg_version: u32,
) -> anyhow::Result<Option<ReploriginRecord>> {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_REPLORIGIN_SET {
let xlrec = crate::walrecord::XlReploriginSet::decode(buf);
return Ok(Some(ReploriginRecord::Set(xlrec)));
} else if info == pg_constants::XLOG_REPLORIGIN_DROP {
let xlrec = crate::walrecord::XlReploriginDrop::decode(buf);
return Ok(Some(ReploriginRecord::Drop(xlrec)));
}
Ok(None)
}
async fn ingest_replorigin_record(
&mut self,
record: ReploriginRecord,
modification: &mut DatadirModification<'_>,
) -> Result<()> {
match record {
ReploriginRecord::Set(set) => {
modification
.set_replorigin(set.node_id, set.remote_lsn)
.await?;
}
ReploriginRecord::Drop(drop) => {
modification.drop_replorigin(drop.node_id).await?;
}
}
Ok(())
}
async fn put_rel_creation(
&mut self,
modification: &mut DatadirModification<'_>,