mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
Detect when a checkpoint is modified in a smarter way.
Previously, the WAL receiver we would make a decoded copy of the current Checkpoint before each WAL record, and compare it with the Checkpoint after the record has been processed. If it has changed, the checkpoint relish is updated in the repository. That's somewhat expensive, the Checkpoint::encode() function is visible in 'perf' profile. Change that so that we set a flag whenever the Checkpoint struct is modified, so that we dont need to compare the whole struct anymore.
This commit is contained in:
@@ -286,12 +286,15 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa
|
||||
///
|
||||
pub fn save_decoded_record(
|
||||
checkpoint: &mut CheckPoint,
|
||||
checkpoint_modified: &mut bool,
|
||||
timeline: &dyn Timeline,
|
||||
decoded: &DecodedWALRecord,
|
||||
recdata: Bytes,
|
||||
lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
checkpoint.update_next_xid(decoded.xl_xid);
|
||||
if checkpoint.update_next_xid(decoded.xl_xid) {
|
||||
*checkpoint_modified = true;
|
||||
}
|
||||
|
||||
// Iterate through all the blocks that the record modifies, and
|
||||
// "put" a separate copy of the record for each block.
|
||||
@@ -375,7 +378,7 @@ pub fn save_decoded_record(
|
||||
} else {
|
||||
assert!(info == pg_constants::CLOG_TRUNCATE);
|
||||
let xlrec = XlClogTruncate::decode(&mut buf);
|
||||
save_clog_truncate_record(checkpoint, timeline, lsn, &xlrec)?;
|
||||
save_clog_truncate_record(checkpoint, checkpoint_modified, timeline, lsn, &xlrec)?;
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
@@ -444,10 +447,17 @@ pub fn save_decoded_record(
|
||||
)?;
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
||||
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
||||
save_multixact_create_record(checkpoint, timeline, lsn, &xlrec, decoded)?;
|
||||
save_multixact_create_record(
|
||||
checkpoint,
|
||||
checkpoint_modified,
|
||||
timeline,
|
||||
lsn,
|
||||
&xlrec,
|
||||
decoded,
|
||||
)?;
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
||||
save_multixact_truncate_record(checkpoint, timeline, lsn, &xlrec)?;
|
||||
save_multixact_truncate_record(checkpoint, checkpoint_modified, timeline, lsn, &xlrec)?;
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
|
||||
let xlrec = XlRelmapUpdate::decode(&mut buf);
|
||||
@@ -456,7 +466,10 @@ pub fn save_decoded_record(
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_NEXTOID {
|
||||
let next_oid = buf.get_u32_le();
|
||||
checkpoint.nextOid = next_oid;
|
||||
if checkpoint.nextOid != next_oid {
|
||||
checkpoint.nextOid = next_oid;
|
||||
*checkpoint_modified = true;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|
||||
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
@@ -472,6 +485,7 @@ pub fn save_decoded_record(
|
||||
);
|
||||
if (checkpoint.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
|
||||
checkpoint.oldestXid = xlog_checkpoint.oldestXid;
|
||||
*checkpoint_modified = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -675,6 +689,7 @@ fn save_xact_record(
|
||||
|
||||
fn save_clog_truncate_record(
|
||||
checkpoint: &mut CheckPoint,
|
||||
checkpoint_modified: &mut bool,
|
||||
timeline: &dyn Timeline,
|
||||
lsn: Lsn,
|
||||
xlrec: &XlClogTruncate,
|
||||
@@ -693,6 +708,7 @@ fn save_clog_truncate_record(
|
||||
// TODO Figure out if there will be any issues with replica.
|
||||
checkpoint.oldestXid = xlrec.oldest_xid;
|
||||
checkpoint.oldestXidDB = xlrec.oldest_xid_db;
|
||||
*checkpoint_modified = true;
|
||||
|
||||
// TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
|
||||
|
||||
@@ -735,6 +751,7 @@ fn save_clog_truncate_record(
|
||||
|
||||
fn save_multixact_create_record(
|
||||
checkpoint: &mut CheckPoint,
|
||||
checkpoint_modified: &mut bool,
|
||||
timeline: &dyn Timeline,
|
||||
lsn: Lsn,
|
||||
xlrec: &XlMultiXactCreate,
|
||||
@@ -791,9 +808,11 @@ fn save_multixact_create_record(
|
||||
}
|
||||
if xlrec.mid >= checkpoint.nextMulti {
|
||||
checkpoint.nextMulti = xlrec.mid + 1;
|
||||
*checkpoint_modified = true;
|
||||
}
|
||||
if xlrec.moff + xlrec.nmembers > checkpoint.nextMultiOffset {
|
||||
checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
|
||||
*checkpoint_modified = true;
|
||||
}
|
||||
let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
|
||||
if mbr.xid.wrapping_sub(acc) as i32 > 0 {
|
||||
@@ -803,18 +822,22 @@ fn save_multixact_create_record(
|
||||
}
|
||||
});
|
||||
|
||||
checkpoint.update_next_xid(max_mbr_xid);
|
||||
if checkpoint.update_next_xid(max_mbr_xid) {
|
||||
*checkpoint_modified = true;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn save_multixact_truncate_record(
|
||||
checkpoint: &mut CheckPoint,
|
||||
checkpoint_modified: &mut bool,
|
||||
timeline: &dyn Timeline,
|
||||
lsn: Lsn,
|
||||
xlrec: &XlMultiXactTruncate,
|
||||
) -> Result<()> {
|
||||
checkpoint.oldestMulti = xlrec.end_trunc_off;
|
||||
checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
|
||||
*checkpoint_modified = true;
|
||||
|
||||
// PerformMembersTruncation
|
||||
let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
|
||||
|
||||
@@ -214,26 +214,27 @@ fn walreceiver_main(
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let _enter = info_span!("processing record", lsn = %lsn).entered();
|
||||
|
||||
// Save old checkpoint value to compare with it after decoding WAL record
|
||||
let old_checkpoint_bytes = checkpoint.encode();
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
// at risk of hittind a deadlock.
|
||||
assert!(lsn.is_aligned());
|
||||
|
||||
let mut checkpoint_modified = false;
|
||||
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
restore_local_repo::save_decoded_record(
|
||||
&mut checkpoint,
|
||||
&mut checkpoint_modified,
|
||||
&*timeline,
|
||||
&decoded,
|
||||
recdata,
|
||||
lsn,
|
||||
)?;
|
||||
|
||||
let new_checkpoint_bytes = checkpoint.encode();
|
||||
// Check if checkpoint data was updated by save_decoded_record
|
||||
if new_checkpoint_bytes != old_checkpoint_bytes {
|
||||
if checkpoint_modified {
|
||||
let new_checkpoint_bytes = checkpoint.encode();
|
||||
|
||||
timeline.put_page_image(
|
||||
RelishTag::Checkpoint,
|
||||
0,
|
||||
|
||||
@@ -377,10 +377,12 @@ impl CheckPoint {
|
||||
Ok(CheckPoint::des(buf)?)
|
||||
}
|
||||
|
||||
// Update next XID based on provided new_xid and stored epoch.
|
||||
// Next XID should be greater than new_xid.
|
||||
// Also take in account 32-bit wrap-around.
|
||||
pub fn update_next_xid(&mut self, xid: u32) {
|
||||
/// Update next XID based on provided new_xid and stored epoch.
|
||||
/// Next XID should be greater than new_xid. This handles 32-bit
|
||||
/// XID wraparound correctly.
|
||||
///
|
||||
/// Returns 'true' if the XID was updated.
|
||||
pub fn update_next_xid(&mut self, xid: u32) -> bool {
|
||||
let xid = xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
|
||||
let full_xid = self.nextXid.value;
|
||||
let new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
|
||||
@@ -391,10 +393,14 @@ impl CheckPoint {
|
||||
// wrap-around
|
||||
epoch += 1;
|
||||
}
|
||||
self.nextXid = FullTransactionId {
|
||||
value: (epoch << 32) | new_xid as u64,
|
||||
};
|
||||
let nextXid = (epoch << 32) | new_xid as u64;
|
||||
|
||||
if nextXid != self.nextXid.value {
|
||||
self.nextXid = FullTransactionId { value: nextXid };
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user