Fix CLOG truncate handling in case of wraparound.

This commit is contained in:
anastasia
2021-08-06 14:05:28 +03:00
committed by lubennikovaav
parent 949ac54401
commit c99a211b01
4 changed files with 98 additions and 32 deletions

View File

@@ -100,9 +100,12 @@ impl<'a> Basebackup<'a> {
.timeline
.get_relish_size(RelishTag::Slru { slru, segno }, self.lsn)?;
if seg_size == None
{
info!("SLRU segment {}/{:>04X} was truncated", slru.to_str(), segno);
if seg_size == None {
trace!(
"SLRU segment {}/{:>04X} was truncated",
slru.to_str(),
segno
);
return Ok(());
}

View File

@@ -338,7 +338,10 @@ impl Timeline for ObjectTimeline {
/// Return None if the relish was truncated
fn get_relish_size(&self, rel: RelishTag, lsn: Lsn) -> Result<Option<u32>> {
if !rel.is_blocky() {
bail!("invalid get_relish_size request for non-blocky relish {}", rel);
bail!(
"invalid get_relish_size request for non-blocky relish {}",
rel
);
}
let lsn = self.wait_lsn(lsn)?;
@@ -577,10 +580,9 @@ impl Timeline for ObjectTimeline {
/// To truncate SLRU segments use put_unlink() function.
///
fn put_truncation(&self, rel: RelishTag, lsn: Lsn, nblocks: u32) -> Result<()> {
match rel {
RelishTag::Relation(_) => {},
_ => bail!("invalid truncation for non-rel relish {}", rel)
RelishTag::Relation(_) => {}
_ => bail!("invalid truncation for non-rel relish {}", rel),
};
info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);

View File

@@ -268,7 +268,10 @@ impl PageServerHandler {
.observe_closure_duration(|| {
// Return 0 if relation is not found.
// This is what postgres smgr expects.
timeline.get_relish_size(tag, req.lsn).unwrap_or(Some(0)).unwrap_or(0)
timeline
.get_relish_size(tag, req.lsn)
.unwrap_or(Some(0))
.unwrap_or(0)
});
PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks })

View File

@@ -19,8 +19,8 @@ use crate::repository::*;
use crate::waldecoder::*;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::Oid;
use postgres_ffi::{pg_constants, CheckPoint, ControlFileData};
use postgres_ffi::{Oid, TransactionId};
use zenith_utils::lsn::Lsn;
const MAX_MBR_BLKNO: u32 =
@@ -531,7 +531,9 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
assert_eq!(src_rel.spcnode, src_tablespace_id);
assert_eq!(src_rel.dbnode, src_db_id);
let nblocks = timeline.get_relish_size(RelishTag::Relation(src_rel), req_lsn)?.unwrap_or(0);
let nblocks = timeline
.get_relish_size(RelishTag::Relation(src_rel), req_lsn)?
.unwrap_or(0);
let dst_rel = RelTag {
spcnode: tablespace_id,
dbnode: db_id,
@@ -705,43 +707,98 @@ fn save_xact_record(
Ok(())
}
// See TransactionIdPrecedes in transam.c
fn transaction_id_precedes(id1: TransactionId, id2: TransactionId) -> bool {
/*
* If either ID is a permanent XID then we can just do unsigned
* comparison. If both are normal, do a modulo-2^32 comparison.
*/
if !(id1 >= pg_constants::FIRST_NORMAL_TRANSACTION_ID)
|| !(id2 >= pg_constants::FIRST_NORMAL_TRANSACTION_ID)
{
return id1 < id2;
}
let diff = id1.wrapping_sub(id2) as i32;
return diff < 0;
}
// See CLOGPagePrecedes in clog.c
fn clogpage_precedes(page1: u32, page2: u32) -> bool {
let mut xid1 = page1 * pg_constants::CLOG_XACTS_PER_PAGE;
xid1 += pg_constants::FIRST_NORMAL_TRANSACTION_ID + 1;
let mut xid2 = page2 * pg_constants::CLOG_XACTS_PER_PAGE;
xid2 += pg_constants::FIRST_NORMAL_TRANSACTION_ID + 1;
return transaction_id_precedes(xid1, xid2)
&& transaction_id_precedes(xid1, xid2 + pg_constants::CLOG_XACTS_PER_PAGE - 1);
}
// See SlruMayDeleteSegment() in slru.c
fn slru_may_delete_clogsegment(segpage: u32, cutoff_page: u32) -> bool {
let seg_last_page = segpage + pg_constants::SLRU_PAGES_PER_SEGMENT - 1;
assert_eq!(segpage % pg_constants::SLRU_PAGES_PER_SEGMENT, 0);
return clogpage_precedes(segpage, cutoff_page)
&& clogpage_precedes(seg_last_page, cutoff_page);
}
fn save_clog_truncate_record(
checkpoint: &mut CheckPoint,
_timeline: &dyn Timeline,
timeline: &dyn Timeline,
lsn: Lsn,
xlrec: &XlClogTruncate,
) -> Result<()> {
info!(
"RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {} lsn {}",
xlrec.pageno,
xlrec.oldest_xid,
xlrec.oldest_xid_db,
lsn
xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db, lsn
);
// Here we treat oldestXid and oldestXidDB
// differently from postgres redo routines.
// In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
// until checkpoint happens and updates the value.
// Here we can use the most recent value.
// It's just an optimization, though and can be deleted.
// TODO Figure out if there will be any issues with replica.
checkpoint.oldestXid = xlrec.oldest_xid;
checkpoint.oldestXidDB = xlrec.oldest_xid_db;
// FIXME: Handle XID wraparound! I just commented this out,
// because it was wrong in a dangerous way. But what this should
// now do is identify the CLOG segments in the repository that are
// older than the threshold in the WAL recor - taking XID
// wraparound into account like the corresponding PostgreSQL code
// does! - and call put_unlink() for the segments that are no
// longer needed.
// TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
/*
if let Some(ObjectTag::Clog(first_slru_tag)) =
timeline.get_next_tag(ObjectTag::Clog(SlruBufferTag { blknum: 0 }))?
{
for trunc_blknum in first_slru_tag.blknum..=pageno {
let tag = ObjectTag::Clog(SlruBufferTag {
blknum: trunc_blknum,
});
timeline.put_slru_truncate(tag, lsn)?;
let latest_page_number = checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
// Now delete all segments containing pages between xlrec.pageno
// and latest_page_number.
// First, make an important safety check:
// the current endpoint page must not be eligible for removal.
// See SimpleLruTruncate() in slru.c
if clogpage_precedes(latest_page_number, xlrec.pageno) {
info!("could not truncate directory pg_xact apparent wraparound");
return Ok(());
}
// Iterate via SLRU CLOG segments and unlink segments that we're ready to truncate
// TODO This implementation is very inefficient -
// it scans all non-rels only to find Clog
for obj in timeline.list_nonrels(lsn)? {
match obj {
RelishTag::Slru { slru, segno } => {
if slru == SlruKind::Clog {
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
timeline.put_unlink(RelishTag::Slru { slru, segno }, lsn)?;
trace!("unlink CLOG segment {:>04X} at lsn {}", segno, lsn);
}
}
}
_ => {}
}
*/
}
Ok(())
}
@@ -814,6 +871,7 @@ fn save_multixact_create_record(
acc
}
});
checkpoint.update_next_xid(max_mbr_xid);
Ok(())
}