From 8c553297cb316bb2c39a799074cffff6db06d8f4 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 10 Mar 2025 16:33:28 +0000 Subject: [PATCH] safekeeper: use max end lsn as start of next batch (#11152) ## Problem Partial reads are still problematic. They are stored in the buffer of the wal decoder and result in gaps being reported too eagerly on the pageserver side. ## Summary of changes Previously, we always used the start LSN of the chunk of WAL that was just read. This patch switches to using the end LSN of the last record that was decoded in the previous iteration. --- libs/postgres_ffi/src/lib.rs | 8 ++++++++ safekeeper/src/send_interpreted_wal.rs | 8 +++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 8dfd8d8750..05d8de4c7a 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -396,6 +396,14 @@ pub mod waldecoder { self.lsn + self.inputbuf.remaining() as u64 } + /// Returns the LSN up to which the WAL decoder has processed. + /// + /// If [`Self::poll_decode`] returned a record, then this will return + /// the end LSN of said record. + pub fn lsn(&self) -> Lsn { + self.lsn + } + pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); } diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index c71f23a010..2b1fd7b854 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -219,12 +219,12 @@ impl InterpretedWalReaderState { } } - fn take_current_batch_wal_start(&mut self) -> Lsn { + fn replace_current_batch_wal_start(&mut self, with: Lsn) -> Lsn { match self { InterpretedWalReaderState::Running { current_batch_wal_start, .. - } => current_batch_wal_start.take().unwrap(), + } => current_batch_wal_start.replace(with).unwrap(), InterpretedWalReaderState::Done => { panic!("take_current_batch_wal_start called on finished reader") } @@ -416,10 +416,12 @@ impl InterpretedWalReader { let shard_ids = self.shard_senders.keys().copied().collect::>(); let mut records_by_sender: HashMap> = HashMap::new(); let mut max_next_record_lsn = None; + let mut max_end_record_lsn = None; while let Some((next_record_lsn, recdata)) = wal_decoder.poll_decode()? { assert!(next_record_lsn.is_aligned()); max_next_record_lsn = Some(next_record_lsn); + max_end_record_lsn = Some(wal_decoder.lsn()); let interpreted = InterpretedWalRecord::from_bytes_filtered( recdata, @@ -470,7 +472,7 @@ impl InterpretedWalReader { let batch_wal_start_lsn = { let mut guard = self.state.write().unwrap(); guard.update_current_position(max_next_record_lsn); - guard.take_current_batch_wal_start() + guard.replace_current_batch_wal_start(max_end_record_lsn.unwrap()) }; // Send interpreted records downstream. Anything that has already been seen