mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
safekeeper: use max end lsn as start of next batch (#11152)
## Problem Partial reads are still problematic. They are stored in the buffer of the wal decoder and result in gaps being reported too eagerly on the pageserver side. ## Summary of changes Previously, we always used the start LSN of the chunk of WAL that was just read. This patch switches to using the end LSN of the last record that was decoded in the previous iteration.
This commit is contained in:
@@ -396,6 +396,14 @@ pub mod waldecoder {
|
||||
self.lsn + self.inputbuf.remaining() as u64
|
||||
}
|
||||
|
||||
/// Returns the LSN up to which the WAL decoder has processed.
|
||||
///
|
||||
/// If [`Self::poll_decode`] returned a record, then this will return
|
||||
/// the end LSN of said record.
|
||||
pub fn lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
pub fn feed_bytes(&mut self, buf: &[u8]) {
|
||||
self.inputbuf.extend_from_slice(buf);
|
||||
}
|
||||
|
||||
@@ -219,12 +219,12 @@ impl InterpretedWalReaderState {
|
||||
}
|
||||
}
|
||||
|
||||
fn take_current_batch_wal_start(&mut self) -> Lsn {
|
||||
fn replace_current_batch_wal_start(&mut self, with: Lsn) -> Lsn {
|
||||
match self {
|
||||
InterpretedWalReaderState::Running {
|
||||
current_batch_wal_start,
|
||||
..
|
||||
} => current_batch_wal_start.take().unwrap(),
|
||||
} => current_batch_wal_start.replace(with).unwrap(),
|
||||
InterpretedWalReaderState::Done => {
|
||||
panic!("take_current_batch_wal_start called on finished reader")
|
||||
}
|
||||
@@ -416,10 +416,12 @@ impl InterpretedWalReader {
|
||||
let shard_ids = self.shard_senders.keys().copied().collect::<Vec<_>>();
|
||||
let mut records_by_sender: HashMap<ShardSenderId, Vec<InterpretedWalRecord>> = HashMap::new();
|
||||
let mut max_next_record_lsn = None;
|
||||
let mut max_end_record_lsn = None;
|
||||
while let Some((next_record_lsn, recdata)) = wal_decoder.poll_decode()?
|
||||
{
|
||||
assert!(next_record_lsn.is_aligned());
|
||||
max_next_record_lsn = Some(next_record_lsn);
|
||||
max_end_record_lsn = Some(wal_decoder.lsn());
|
||||
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
@@ -470,7 +472,7 @@ impl InterpretedWalReader {
|
||||
let batch_wal_start_lsn = {
|
||||
let mut guard = self.state.write().unwrap();
|
||||
guard.update_current_position(max_next_record_lsn);
|
||||
guard.take_current_batch_wal_start()
|
||||
guard.replace_current_batch_wal_start(max_end_record_lsn.unwrap())
|
||||
};
|
||||
|
||||
// Send interpreted records downstream. Anything that has already been seen
|
||||
|
||||
Reference in New Issue
Block a user