diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 8dfd8d8750..05d8de4c7a 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -396,6 +396,14 @@ pub mod waldecoder { self.lsn + self.inputbuf.remaining() as u64 } + /// Returns the LSN up to which the WAL decoder has processed. + /// + /// If [`Self::poll_decode`] returned a record, then this will return + /// the end LSN of said record. + pub fn lsn(&self) -> Lsn { + self.lsn + } + pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); } diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index c71f23a010..2b1fd7b854 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -219,12 +219,12 @@ impl InterpretedWalReaderState { } } - fn take_current_batch_wal_start(&mut self) -> Lsn { + fn replace_current_batch_wal_start(&mut self, with: Lsn) -> Lsn { match self { InterpretedWalReaderState::Running { current_batch_wal_start, .. - } => current_batch_wal_start.take().unwrap(), + } => current_batch_wal_start.replace(with).unwrap(), InterpretedWalReaderState::Done => { panic!("take_current_batch_wal_start called on finished reader") } @@ -416,10 +416,12 @@ impl InterpretedWalReader { let shard_ids = self.shard_senders.keys().copied().collect::>(); let mut records_by_sender: HashMap> = HashMap::new(); let mut max_next_record_lsn = None; + let mut max_end_record_lsn = None; while let Some((next_record_lsn, recdata)) = wal_decoder.poll_decode()? { assert!(next_record_lsn.is_aligned()); max_next_record_lsn = Some(next_record_lsn); + max_end_record_lsn = Some(wal_decoder.lsn()); let interpreted = InterpretedWalRecord::from_bytes_filtered( recdata, @@ -470,7 +472,7 @@ impl InterpretedWalReader { let batch_wal_start_lsn = { let mut guard = self.state.write().unwrap(); guard.update_current_position(max_next_record_lsn); - guard.take_current_batch_wal_start() + guard.replace_current_batch_wal_start(max_end_record_lsn.unwrap()) }; // Send interpreted records downstream. Anything that has already been seen