From 87fce3fcd5e13e44b5656fe8b7a9b92a1bd9fef4 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 11 Jun 2021 15:49:38 +0300 Subject: [PATCH] 1. Always start repliction from the begging of WAL segment (to be able to skip missed segments) 2. Do not materialize always last version of objects in GC (only when needed) 3. Fix history test 4. Fix CPU consumption in wal_keeper when connection is broken 5. Fix handling of --recall parameter in walkeeper --- pageserver/src/object_repository.rs | 28 +++++++++++----------------- pageserver/src/repository.rs | 2 +- pageserver/src/waldecoder.rs | 10 ++++++---- pageserver/src/walreceiver.rs | 16 +++++----------- walkeeper/src/bin/wal_acceptor.rs | 2 +- walkeeper/src/receive_wal.rs | 9 ++++++++- walkeeper/src/replication.rs | 6 +++++- walkeeper/src/timeline.rs | 4 +++- 8 files changed, 40 insertions(+), 37 deletions(-) diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index d91e12088b..7d7809b78c 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -290,7 +290,7 @@ impl Timeline for ObjectTimeline { self.put_page_image(tag, lsn, page_img.clone())?; } - _ => page_img = Bytes::from_static(&ZERO_PAGE), + x => bail!("Unexpected object value: {:?}", x), } // FIXME: assumes little-endian. Only used for the debugging log though let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); @@ -554,16 +554,16 @@ impl Timeline for ObjectTimeline { fn advance_last_record_lsn(&self, lsn: Lsn) { // Can't move backwards. let old = self.last_record_lsn.fetch_max(lsn); - assert!(old <= lsn); - - // Also advance last_valid_lsn - let old = self.last_valid_lsn.advance(lsn); - // Can't move backwards. - if lsn < old { - warn!( - "attempted to move last record LSN backwards (was {}, new {})", - old, lsn - ); + if old <= lsn { + // Also advance last_valid_lsn + let old = self.last_valid_lsn.advance(lsn); + // Can't move backwards. + if lsn < old { + warn!( + "attempted to move last record LSN backwards (was {}, new {})", + old, lsn + ); + } } } fn get_last_record_lsn(&self) -> Lsn { @@ -779,9 +779,6 @@ impl ObjectTimeline { } } ObjectTag::RelationBuffer(tag) => { - // Reconstruct last page - self.get_page_at_lsn_nowait(obj, last_lsn)?; - // Reconstruct page at horizon unless relation was dropped // and delete all older versions over horizon let mut last_version = true; @@ -820,9 +817,6 @@ impl ObjectTimeline { ObjectTag::Clog(_) | ObjectTag::MultiXactOffsets(_) | ObjectTag::MultiXactMembers(_) => { - // Materialize last version - self.get_page_at_lsn_nowait(obj, last_lsn)?; - // Remove old versions over horizon let mut last_version = true; let key = ObjectKey { diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 256af4d1c1..3d95dce9a0 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -566,13 +566,13 @@ mod tests { tline.advance_last_valid_lsn(Lsn(2)); let mut snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(2)); - assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); let expected_truncate = RelationUpdate { rel, lsn: Lsn(2), update: Update::Truncate { n_blocks: 0 }, }; assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); // TODO ordering not guaranteed by API + assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); assert_eq!(None, snapshot.next().transpose()?); Ok(()) diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 52dd2a0d20..b0c4be8133 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -97,10 +97,12 @@ impl WalStreamDecoder { let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf); if hdr.std.xlp_pageaddr != self.lsn.0 { - return Err(WalDecodeError { - msg: "invalid xlog segment header".into(), - lsn: self.lsn, - }); + info!( + "Receive page with LSN {} instead of expected {}", + Lsn(hdr.std.xlp_pageaddr), + self.lsn + ); + self.lsn = Lsn(hdr.std.xlp_pageaddr); } // TODO: verify the remaining fields in the header diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 7aa4c3fa97..1d98a0f6fc 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -150,18 +150,12 @@ fn walreceiver_main( error!("No previous WAL position"); } - // FIXME: We have to do it to handle new segment generated by pg_resetwal at compute node startup - startpoint = Lsn::max( - startpoint, - Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)), - ); + // FIXME: We have to do it to handle new segment generated by pg_resetwal at compute node startup. + // This hack is needed only when pageserver is connected directly to compute node, not through walkeeper. + startpoint = Lsn::max(startpoint, end_of_wal); - // There might be some padding after the last full record, skip it. - // - // FIXME: It probably would be better to always start streaming from the beginning - // of the page, or the segment, so that we could check the page/segment headers - // too. Just for the sake of paranoia. - startpoint += startpoint.calc_padding(8u32); + // Always start from the beginning of segment + startpoint = Lsn(startpoint.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)); debug!( "last_record_lsn {} starting replication from {} for timeline {}, server is at {}...", diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index 10a9632c3d..0ef0436605 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -48,7 +48,7 @@ fn main() -> Result<()> { .help("interval for keeping WAL as walkeeper node, after which them will be uploaded to S3 and removed locally"), ) .arg( - Arg::with_name("recall-period") + Arg::with_name("recall") .long("recall") .takes_value(true) .help("Period for requestion pageserver to call for replication"), diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 8b66532849..5094a256f5 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -248,7 +248,14 @@ impl ReceiveWalConn { let (flush_lsn, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, true); my_info.flush_lsn = flush_lsn; - let min_lsn = Lsn((server_info.wal_seg_size as u64) * 2); // strt from second segment because of pg_resetwal + // FIXME: Yet another trick to handle creation of new WAL segment at + // compute node startup (a-la pg_resetwal). + // If restart_lsn was not adjusted then walproposer will try to perform recovery + // because restart_lsn != flush_lsn and fail because first WAL segment is missed. + // May be it is better to handle it by wal proposer, but it will contradict with wal_proposer + // usage in other branches. + // This adjustment is needed only for first segment. + let min_lsn = Lsn((server_info.wal_seg_size as u64) * 2); my_info.restart_lsn = Lsn::max(my_info.restart_lsn, min_lsn); my_info.server.timeline = timeline; diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index cef15f242c..411da144c3 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -63,11 +63,15 @@ impl ReplicationConn { let feedback = HotStandbyFeedback::des(&m)?; timeline.add_hs_feedback(feedback) } - msg => { + None => { + break; + } + Some(msg) => { info!("unexpected message {:?}", msg); } } } + Err(anyhow!("Connection closed")) } /// Helper function that parses a pair of LSNs. diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index f110a1e293..0605f212c7 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -232,8 +232,10 @@ impl TimelineTools for Option> { /// Find last WAL record. If "precise" is false then just locate last partial segment fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) { let seg_size = self.get().get_info().server.wal_seg_size as usize; + assert!(seg_size > 0); let (lsn, timeline) = find_end_of_wal(data_dir, seg_size, precise); - let wal_start = Lsn((seg_size * 2) as u64); // FIXME: handle pg_resetwal + // FIXME: because of generation of new segment at compute node start we just do not have first WAL segment + let wal_start = Lsn((seg_size * 2) as u64); let lsn = Lsn::max(Lsn(lsn), wal_start); (lsn, timeline) }