1. Always start repliction from the begging of WAL segment (to be able to skip missed segments)

2. Do not materialize always last version of objects in GC (only when needed)
3. Fix history test
4. Fix CPU consumption in wal_keeper when connection is broken
5. Fix handling of --recall parameter in walkeeper
This commit is contained in:
Konstantin Knizhnik
2021-06-11 15:49:38 +03:00
parent 04e1ee5ce3
commit 87fce3fcd5
8 changed files with 40 additions and 37 deletions

View File

@@ -290,7 +290,7 @@ impl Timeline for ObjectTimeline {
self.put_page_image(tag, lsn, page_img.clone())?;
}
_ => page_img = Bytes::from_static(&ZERO_PAGE),
x => bail!("Unexpected object value: {:?}", x),
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
@@ -554,16 +554,16 @@ impl Timeline for ObjectTimeline {
fn advance_last_record_lsn(&self, lsn: Lsn) {
// Can't move backwards.
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old <= lsn);
// Also advance last_valid_lsn
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
if lsn < old {
warn!(
"attempted to move last record LSN backwards (was {}, new {})",
old, lsn
);
if old <= lsn {
// Also advance last_valid_lsn
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
if lsn < old {
warn!(
"attempted to move last record LSN backwards (was {}, new {})",
old, lsn
);
}
}
}
fn get_last_record_lsn(&self) -> Lsn {
@@ -779,9 +779,6 @@ impl ObjectTimeline {
}
}
ObjectTag::RelationBuffer(tag) => {
// Reconstruct last page
self.get_page_at_lsn_nowait(obj, last_lsn)?;
// Reconstruct page at horizon unless relation was dropped
// and delete all older versions over horizon
let mut last_version = true;
@@ -820,9 +817,6 @@ impl ObjectTimeline {
ObjectTag::Clog(_)
| ObjectTag::MultiXactOffsets(_)
| ObjectTag::MultiXactMembers(_) => {
// Materialize last version
self.get_page_at_lsn_nowait(obj, last_lsn)?;
// Remove old versions over horizon
let mut last_version = true;
let key = ObjectKey {

View File

@@ -566,13 +566,13 @@ mod tests {
tline.advance_last_valid_lsn(Lsn(2));
let mut snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(2));
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
let expected_truncate = RelationUpdate {
rel,
lsn: Lsn(2),
update: Update::Truncate { n_blocks: 0 },
};
assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); // TODO ordering not guaranteed by API
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);
Ok(())

View File

@@ -97,10 +97,12 @@ impl WalStreamDecoder {
let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf);
if hdr.std.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
msg: "invalid xlog segment header".into(),
lsn: self.lsn,
});
info!(
"Receive page with LSN {} instead of expected {}",
Lsn(hdr.std.xlp_pageaddr),
self.lsn
);
self.lsn = Lsn(hdr.std.xlp_pageaddr);
}
// TODO: verify the remaining fields in the header

View File

@@ -150,18 +150,12 @@ fn walreceiver_main(
error!("No previous WAL position");
}
// FIXME: We have to do it to handle new segment generated by pg_resetwal at compute node startup
startpoint = Lsn::max(
startpoint,
Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)),
);
// FIXME: We have to do it to handle new segment generated by pg_resetwal at compute node startup.
// This hack is needed only when pageserver is connected directly to compute node, not through walkeeper.
startpoint = Lsn::max(startpoint, end_of_wal);
// There might be some padding after the last full record, skip it.
//
// FIXME: It probably would be better to always start streaming from the beginning
// of the page, or the segment, so that we could check the page/segment headers
// too. Just for the sake of paranoia.
startpoint += startpoint.calc_padding(8u32);
// Always start from the beginning of segment
startpoint = Lsn(startpoint.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1));
debug!(
"last_record_lsn {} starting replication from {} for timeline {}, server is at {}...",

View File

@@ -48,7 +48,7 @@ fn main() -> Result<()> {
.help("interval for keeping WAL as walkeeper node, after which them will be uploaded to S3 and removed locally"),
)
.arg(
Arg::with_name("recall-period")
Arg::with_name("recall")
.long("recall")
.takes_value(true)
.help("Period for requestion pageserver to call for replication"),

View File

@@ -248,7 +248,14 @@ impl ReceiveWalConn {
let (flush_lsn, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, true);
my_info.flush_lsn = flush_lsn;
let min_lsn = Lsn((server_info.wal_seg_size as u64) * 2); // strt from second segment because of pg_resetwal
// FIXME: Yet another trick to handle creation of new WAL segment at
// compute node startup (a-la pg_resetwal).
// If restart_lsn was not adjusted then walproposer will try to perform recovery
// because restart_lsn != flush_lsn and fail because first WAL segment is missed.
// May be it is better to handle it by wal proposer, but it will contradict with wal_proposer
// usage in other branches.
// This adjustment is needed only for first segment.
let min_lsn = Lsn((server_info.wal_seg_size as u64) * 2);
my_info.restart_lsn = Lsn::max(my_info.restart_lsn, min_lsn);
my_info.server.timeline = timeline;

View File

@@ -63,11 +63,15 @@ impl ReplicationConn {
let feedback = HotStandbyFeedback::des(&m)?;
timeline.add_hs_feedback(feedback)
}
msg => {
None => {
break;
}
Some(msg) => {
info!("unexpected message {:?}", msg);
}
}
}
Err(anyhow!("Connection closed"))
}
/// Helper function that parses a pair of LSNs.

View File

@@ -232,8 +232,10 @@ impl TimelineTools for Option<Arc<Timeline>> {
/// Find last WAL record. If "precise" is false then just locate last partial segment
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) {
let seg_size = self.get().get_info().server.wal_seg_size as usize;
assert!(seg_size > 0);
let (lsn, timeline) = find_end_of_wal(data_dir, seg_size, precise);
let wal_start = Lsn((seg_size * 2) as u64); // FIXME: handle pg_resetwal
// FIXME: because of generation of new segment at compute node start we just do not have first WAL segment
let wal_start = Lsn((seg_size * 2) as u64);
let lsn = Lsn::max(Lsn(lsn), wal_start);
(lsn, timeline)
}