mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
Fix CLOG truncate handling in case of wraparound.
This commit is contained in:
@@ -100,9 +100,12 @@ impl<'a> Basebackup<'a> {
|
|||||||
.timeline
|
.timeline
|
||||||
.get_relish_size(RelishTag::Slru { slru, segno }, self.lsn)?;
|
.get_relish_size(RelishTag::Slru { slru, segno }, self.lsn)?;
|
||||||
|
|
||||||
if seg_size == None
|
if seg_size == None {
|
||||||
{
|
trace!(
|
||||||
info!("SLRU segment {}/{:>04X} was truncated", slru.to_str(), segno);
|
"SLRU segment {}/{:>04X} was truncated",
|
||||||
|
slru.to_str(),
|
||||||
|
segno
|
||||||
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -338,7 +338,10 @@ impl Timeline for ObjectTimeline {
|
|||||||
/// Return None if the relish was truncated
|
/// Return None if the relish was truncated
|
||||||
fn get_relish_size(&self, rel: RelishTag, lsn: Lsn) -> Result<Option<u32>> {
|
fn get_relish_size(&self, rel: RelishTag, lsn: Lsn) -> Result<Option<u32>> {
|
||||||
if !rel.is_blocky() {
|
if !rel.is_blocky() {
|
||||||
bail!("invalid get_relish_size request for non-blocky relish {}", rel);
|
bail!(
|
||||||
|
"invalid get_relish_size request for non-blocky relish {}",
|
||||||
|
rel
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let lsn = self.wait_lsn(lsn)?;
|
let lsn = self.wait_lsn(lsn)?;
|
||||||
@@ -577,10 +580,9 @@ impl Timeline for ObjectTimeline {
|
|||||||
/// To truncate SLRU segments use put_unlink() function.
|
/// To truncate SLRU segments use put_unlink() function.
|
||||||
///
|
///
|
||||||
fn put_truncation(&self, rel: RelishTag, lsn: Lsn, nblocks: u32) -> Result<()> {
|
fn put_truncation(&self, rel: RelishTag, lsn: Lsn, nblocks: u32) -> Result<()> {
|
||||||
|
|
||||||
match rel {
|
match rel {
|
||||||
RelishTag::Relation(_) => {},
|
RelishTag::Relation(_) => {}
|
||||||
_ => bail!("invalid truncation for non-rel relish {}", rel)
|
_ => bail!("invalid truncation for non-rel relish {}", rel),
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
|
info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
|
||||||
|
|||||||
@@ -268,7 +268,10 @@ impl PageServerHandler {
|
|||||||
.observe_closure_duration(|| {
|
.observe_closure_duration(|| {
|
||||||
// Return 0 if relation is not found.
|
// Return 0 if relation is not found.
|
||||||
// This is what postgres smgr expects.
|
// This is what postgres smgr expects.
|
||||||
timeline.get_relish_size(tag, req.lsn).unwrap_or(Some(0)).unwrap_or(0)
|
timeline
|
||||||
|
.get_relish_size(tag, req.lsn)
|
||||||
|
.unwrap_or(Some(0))
|
||||||
|
.unwrap_or(0)
|
||||||
});
|
});
|
||||||
|
|
||||||
PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks })
|
PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks })
|
||||||
|
|||||||
@@ -19,8 +19,8 @@ use crate::repository::*;
|
|||||||
use crate::waldecoder::*;
|
use crate::waldecoder::*;
|
||||||
use postgres_ffi::relfile_utils::*;
|
use postgres_ffi::relfile_utils::*;
|
||||||
use postgres_ffi::xlog_utils::*;
|
use postgres_ffi::xlog_utils::*;
|
||||||
use postgres_ffi::Oid;
|
|
||||||
use postgres_ffi::{pg_constants, CheckPoint, ControlFileData};
|
use postgres_ffi::{pg_constants, CheckPoint, ControlFileData};
|
||||||
|
use postgres_ffi::{Oid, TransactionId};
|
||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
|
|
||||||
const MAX_MBR_BLKNO: u32 =
|
const MAX_MBR_BLKNO: u32 =
|
||||||
@@ -531,7 +531,9 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
|
|||||||
assert_eq!(src_rel.spcnode, src_tablespace_id);
|
assert_eq!(src_rel.spcnode, src_tablespace_id);
|
||||||
assert_eq!(src_rel.dbnode, src_db_id);
|
assert_eq!(src_rel.dbnode, src_db_id);
|
||||||
|
|
||||||
let nblocks = timeline.get_relish_size(RelishTag::Relation(src_rel), req_lsn)?.unwrap_or(0);
|
let nblocks = timeline
|
||||||
|
.get_relish_size(RelishTag::Relation(src_rel), req_lsn)?
|
||||||
|
.unwrap_or(0);
|
||||||
let dst_rel = RelTag {
|
let dst_rel = RelTag {
|
||||||
spcnode: tablespace_id,
|
spcnode: tablespace_id,
|
||||||
dbnode: db_id,
|
dbnode: db_id,
|
||||||
@@ -705,43 +707,98 @@ fn save_xact_record(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// See TransactionIdPrecedes in transam.c
|
||||||
|
fn transaction_id_precedes(id1: TransactionId, id2: TransactionId) -> bool {
|
||||||
|
/*
|
||||||
|
* If either ID is a permanent XID then we can just do unsigned
|
||||||
|
* comparison. If both are normal, do a modulo-2^32 comparison.
|
||||||
|
*/
|
||||||
|
|
||||||
|
if !(id1 >= pg_constants::FIRST_NORMAL_TRANSACTION_ID)
|
||||||
|
|| !(id2 >= pg_constants::FIRST_NORMAL_TRANSACTION_ID)
|
||||||
|
{
|
||||||
|
return id1 < id2;
|
||||||
|
}
|
||||||
|
|
||||||
|
let diff = id1.wrapping_sub(id2) as i32;
|
||||||
|
return diff < 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// See CLOGPagePrecedes in clog.c
|
||||||
|
fn clogpage_precedes(page1: u32, page2: u32) -> bool {
|
||||||
|
let mut xid1 = page1 * pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
|
xid1 += pg_constants::FIRST_NORMAL_TRANSACTION_ID + 1;
|
||||||
|
let mut xid2 = page2 * pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
|
xid2 += pg_constants::FIRST_NORMAL_TRANSACTION_ID + 1;
|
||||||
|
|
||||||
|
return transaction_id_precedes(xid1, xid2)
|
||||||
|
&& transaction_id_precedes(xid1, xid2 + pg_constants::CLOG_XACTS_PER_PAGE - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// See SlruMayDeleteSegment() in slru.c
|
||||||
|
fn slru_may_delete_clogsegment(segpage: u32, cutoff_page: u32) -> bool {
|
||||||
|
let seg_last_page = segpage + pg_constants::SLRU_PAGES_PER_SEGMENT - 1;
|
||||||
|
|
||||||
|
assert_eq!(segpage % pg_constants::SLRU_PAGES_PER_SEGMENT, 0);
|
||||||
|
|
||||||
|
return clogpage_precedes(segpage, cutoff_page)
|
||||||
|
&& clogpage_precedes(seg_last_page, cutoff_page);
|
||||||
|
}
|
||||||
|
|
||||||
fn save_clog_truncate_record(
|
fn save_clog_truncate_record(
|
||||||
checkpoint: &mut CheckPoint,
|
checkpoint: &mut CheckPoint,
|
||||||
_timeline: &dyn Timeline,
|
timeline: &dyn Timeline,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
xlrec: &XlClogTruncate,
|
xlrec: &XlClogTruncate,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
info!(
|
info!(
|
||||||
"RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {} lsn {}",
|
"RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {} lsn {}",
|
||||||
xlrec.pageno,
|
xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db, lsn
|
||||||
xlrec.oldest_xid,
|
|
||||||
xlrec.oldest_xid_db,
|
|
||||||
lsn
|
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Here we treat oldestXid and oldestXidDB
|
||||||
|
// differently from postgres redo routines.
|
||||||
|
// In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
|
||||||
|
// until checkpoint happens and updates the value.
|
||||||
|
// Here we can use the most recent value.
|
||||||
|
// It's just an optimization, though and can be deleted.
|
||||||
|
// TODO Figure out if there will be any issues with replica.
|
||||||
checkpoint.oldestXid = xlrec.oldest_xid;
|
checkpoint.oldestXid = xlrec.oldest_xid;
|
||||||
checkpoint.oldestXidDB = xlrec.oldest_xid_db;
|
checkpoint.oldestXidDB = xlrec.oldest_xid_db;
|
||||||
|
|
||||||
// FIXME: Handle XID wraparound! I just commented this out,
|
// TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
|
||||||
// because it was wrong in a dangerous way. But what this should
|
|
||||||
// now do is identify the CLOG segments in the repository that are
|
|
||||||
// older than the threshold in the WAL recor - taking XID
|
|
||||||
// wraparound into account like the corresponding PostgreSQL code
|
|
||||||
// does! - and call put_unlink() for the segments that are no
|
|
||||||
// longer needed.
|
|
||||||
|
|
||||||
/*
|
let latest_page_number = checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
if let Some(ObjectTag::Clog(first_slru_tag)) =
|
|
||||||
timeline.get_next_tag(ObjectTag::Clog(SlruBufferTag { blknum: 0 }))?
|
// Now delete all segments containing pages between xlrec.pageno
|
||||||
{
|
// and latest_page_number.
|
||||||
for trunc_blknum in first_slru_tag.blknum..=pageno {
|
|
||||||
let tag = ObjectTag::Clog(SlruBufferTag {
|
// First, make an important safety check:
|
||||||
blknum: trunc_blknum,
|
// the current endpoint page must not be eligible for removal.
|
||||||
});
|
// See SimpleLruTruncate() in slru.c
|
||||||
timeline.put_slru_truncate(tag, lsn)?;
|
if clogpage_precedes(latest_page_number, xlrec.pageno) {
|
||||||
|
info!("could not truncate directory pg_xact apparent wraparound");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate via SLRU CLOG segments and unlink segments that we're ready to truncate
|
||||||
|
// TODO This implementation is very inefficient -
|
||||||
|
// it scans all non-rels only to find Clog
|
||||||
|
for obj in timeline.list_nonrels(lsn)? {
|
||||||
|
match obj {
|
||||||
|
RelishTag::Slru { slru, segno } => {
|
||||||
|
if slru == SlruKind::Clog {
|
||||||
|
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||||
|
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
|
||||||
|
timeline.put_unlink(RelishTag::Slru { slru, segno }, lsn)?;
|
||||||
|
trace!("unlink CLOG segment {:>04X} at lsn {}", segno, lsn);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
_ => {}
|
||||||
}
|
}
|
||||||
*/
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -814,6 +871,7 @@ fn save_multixact_create_record(
|
|||||||
acc
|
acc
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
checkpoint.update_next_xid(max_mbr_xid);
|
checkpoint.update_next_xid(max_mbr_xid);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user