diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index d91e12088b..7d7809b78c 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -290,7 +290,7 @@ impl Timeline for ObjectTimeline { self.put_page_image(tag, lsn, page_img.clone())?; } - _ => page_img = Bytes::from_static(&ZERO_PAGE), + x => bail!("Unexpected object value: {:?}", x), } // FIXME: assumes little-endian. Only used for the debugging log though let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); @@ -554,16 +554,16 @@ impl Timeline for ObjectTimeline { fn advance_last_record_lsn(&self, lsn: Lsn) { // Can't move backwards. let old = self.last_record_lsn.fetch_max(lsn); - assert!(old <= lsn); - - // Also advance last_valid_lsn - let old = self.last_valid_lsn.advance(lsn); - // Can't move backwards. - if lsn < old { - warn!( - "attempted to move last record LSN backwards (was {}, new {})", - old, lsn - ); + if old <= lsn { + // Also advance last_valid_lsn + let old = self.last_valid_lsn.advance(lsn); + // Can't move backwards. + if lsn < old { + warn!( + "attempted to move last record LSN backwards (was {}, new {})", + old, lsn + ); + } } } fn get_last_record_lsn(&self) -> Lsn { @@ -779,9 +779,6 @@ impl ObjectTimeline { } } ObjectTag::RelationBuffer(tag) => { - // Reconstruct last page - self.get_page_at_lsn_nowait(obj, last_lsn)?; - // Reconstruct page at horizon unless relation was dropped // and delete all older versions over horizon let mut last_version = true; @@ -820,9 +817,6 @@ impl ObjectTimeline { ObjectTag::Clog(_) | ObjectTag::MultiXactOffsets(_) | ObjectTag::MultiXactMembers(_) => { - // Materialize last version - self.get_page_at_lsn_nowait(obj, last_lsn)?; - // Remove old versions over horizon let mut last_version = true; let key = ObjectKey { diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 256af4d1c1..3d95dce9a0 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -566,13 +566,13 @@ mod tests { tline.advance_last_valid_lsn(Lsn(2)); let mut snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(2)); - assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); let expected_truncate = RelationUpdate { rel, lsn: Lsn(2), update: Update::Truncate { n_blocks: 0 }, }; assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); // TODO ordering not guaranteed by API + assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); assert_eq!(None, snapshot.next().transpose()?); Ok(()) diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 52dd2a0d20..b0c4be8133 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -97,10 +97,12 @@ impl WalStreamDecoder { let hdr = XLogLongPageHeaderData::from_bytes(&mut self.inputbuf); if hdr.std.xlp_pageaddr != self.lsn.0 { - return Err(WalDecodeError { - msg: "invalid xlog segment header".into(), - lsn: self.lsn, - }); + info!( + "Receive page with LSN {} instead of expected {}", + Lsn(hdr.std.xlp_pageaddr), + self.lsn + ); + self.lsn = Lsn(hdr.std.xlp_pageaddr); } // TODO: verify the remaining fields in the header diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 7aa4c3fa97..1d98a0f6fc 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -150,18 +150,12 @@ fn walreceiver_main( error!("No previous WAL position"); } - // FIXME: We have to do it to handle new segment generated by pg_resetwal at compute node startup - startpoint = Lsn::max( - startpoint, - Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)), - ); + // FIXME: We have to do it to handle new segment generated by pg_resetwal at compute node startup. + // This hack is needed only when pageserver is connected directly to compute node, not through walkeeper. + startpoint = Lsn::max(startpoint, end_of_wal); - // There might be some padding after the last full record, skip it. - // - // FIXME: It probably would be better to always start streaming from the beginning - // of the page, or the segment, so that we could check the page/segment headers - // too. Just for the sake of paranoia. - startpoint += startpoint.calc_padding(8u32); + // Always start from the beginning of segment + startpoint = Lsn(startpoint.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)); debug!( "last_record_lsn {} starting replication from {} for timeline {}, server is at {}...", diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index 10a9632c3d..0ef0436605 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -48,7 +48,7 @@ fn main() -> Result<()> { .help("interval for keeping WAL as walkeeper node, after which them will be uploaded to S3 and removed locally"), ) .arg( - Arg::with_name("recall-period") + Arg::with_name("recall") .long("recall") .takes_value(true) .help("Period for requestion pageserver to call for replication"), diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 8b66532849..5094a256f5 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -248,7 +248,14 @@ impl ReceiveWalConn { let (flush_lsn, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, true); my_info.flush_lsn = flush_lsn; - let min_lsn = Lsn((server_info.wal_seg_size as u64) * 2); // strt from second segment because of pg_resetwal + // FIXME: Yet another trick to handle creation of new WAL segment at + // compute node startup (a-la pg_resetwal). + // If restart_lsn was not adjusted then walproposer will try to perform recovery + // because restart_lsn != flush_lsn and fail because first WAL segment is missed. + // May be it is better to handle it by wal proposer, but it will contradict with wal_proposer + // usage in other branches. + // This adjustment is needed only for first segment. + let min_lsn = Lsn((server_info.wal_seg_size as u64) * 2); my_info.restart_lsn = Lsn::max(my_info.restart_lsn, min_lsn); my_info.server.timeline = timeline; diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index cef15f242c..411da144c3 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -63,11 +63,15 @@ impl ReplicationConn { let feedback = HotStandbyFeedback::des(&m)?; timeline.add_hs_feedback(feedback) } - msg => { + None => { + break; + } + Some(msg) => { info!("unexpected message {:?}", msg); } } } + Err(anyhow!("Connection closed")) } /// Helper function that parses a pair of LSNs. diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index f110a1e293..0605f212c7 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -232,8 +232,10 @@ impl TimelineTools for Option> { /// Find last WAL record. If "precise" is false then just locate last partial segment fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) { let seg_size = self.get().get_info().server.wal_seg_size as usize; + assert!(seg_size > 0); let (lsn, timeline) = find_end_of_wal(data_dir, seg_size, precise); - let wal_start = Lsn((seg_size * 2) as u64); // FIXME: handle pg_resetwal + // FIXME: because of generation of new segment at compute node start we just do not have first WAL segment + let wal_start = Lsn((seg_size * 2) as u64); let lsn = Lsn::max(Lsn(lsn), wal_start); (lsn, timeline) }