diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index de253be46a..0b438dc9dd 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -47,10 +47,12 @@ pub trait Timeline { src_tablespace_id: Oid, ) -> Result<()>; - fn advance_last_record_lsn(&self, lsn: Lsn); fn advance_last_valid_lsn(&self, lsn: Lsn); - fn init_valid_lsn(&self, lsn: Lsn); fn get_last_valid_lsn(&self) -> Lsn; + fn init_valid_lsn(&self, lsn: Lsn); + + fn advance_last_record_lsn(&self, lsn: Lsn); + fn get_last_record_lsn(&self) -> Lsn; } #[derive(Clone)] diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index b3704f6ca5..2127569b2d 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -828,6 +828,10 @@ impl Timeline for RocksTimeline { } } + fn get_last_record_lsn(&self) -> Lsn { + self.last_record_lsn.load() + } + fn init_valid_lsn(&self, lsn: Lsn) { let old = self.last_valid_lsn.advance(lsn); assert!(old == Lsn(0)); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index d6f7fa4fef..97002ceac4 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -158,8 +158,11 @@ fn walreceiver_main( // // Start streaming the WAL, from where we left off previously. // - let mut startpoint = timeline.get_last_valid_lsn(); - let last_valid_lsn = timeline.get_last_valid_lsn(); + // If we had previously received WAL up to some point in the middle of a WAL record, we + // better start from the end of last full WAL record, not in the middle of one. Hence, + // use 'last_record_lsn' rather than 'last_valid_lsn' here. + let last_rec_lsn = timeline.get_last_record_lsn(); + let mut startpoint = last_rec_lsn; if startpoint == Lsn(0) { // If we start here with identify.xlogpos we will have race condition with // postgres start: insert into postgres may request page that was modified with lsn @@ -180,8 +183,8 @@ fn walreceiver_main( startpoint += startpoint.calc_padding(8u32); } debug!( - "last_valid_lsn {} starting replication from {} for timeline {}, server is at {}...", - last_valid_lsn, startpoint, timelineid, end_of_wal + "last_record_lsn {} starting replication from {} for timeline {}, server is at {}...", + last_rec_lsn, startpoint, timelineid, end_of_wal ); let query = format!("START_REPLICATION PHYSICAL {}", startpoint);