From 937876cbe2544fbbd4a436ad6d207909808cce5d Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 7 Mar 2025 15:52:01 +0000 Subject: [PATCH] safekeeper: don't skip empty records for shard zero (#11137) ## Problem Shard zero needs to track the start LSN of the latest record in adition to the LSN of the next record to ingest. The former is included in basebackup persisted by the compute in WAL. Previously, empty records were skipped for all shards. This caused the prev LSN tracking on the PS to fall behind and led to logical replication issues. ## Summary of changes Shard zero now receives emtpy interpreted records for LSN tracking purposes. A test is included too. --- safekeeper/src/send_interpreted_wal.rs | 119 ++++++++++++++++++++++--- safekeeper/src/test_utils.rs | 3 +- safekeeper/src/wal_reader_stream.rs | 2 +- 3 files changed, 112 insertions(+), 12 deletions(-) diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index bf03f27d48..c71f23a010 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -430,7 +430,10 @@ impl InterpretedWalReader { .with_context(|| "Failed to interpret WAL")?; for (shard, record) in interpreted { - if record.is_empty() { + // Shard zero needs to track the start LSN of the latest record + // in adition to the LSN of the next record to ingest. The former + // is included in basebackup persisted by the compute in WAL. + if !shard.is_shard_zero() && record.is_empty() { continue; } @@ -740,7 +743,7 @@ mod tests { .unwrap(); let resident_tli = tli.wal_residence_guard().await.unwrap(); - let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None) + let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None) .await .unwrap(); let end_pos = end_watch.get(); @@ -883,10 +886,16 @@ mod tests { let resident_tli = tli.wal_residence_guard().await.unwrap(); let mut next_record_lsns = Vec::default(); - let end_watch = - Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns)) - .await - .unwrap(); + let end_watch = Env::write_wal( + tli, + start_lsn, + SIZE, + MSG_COUNT, + c"neon-file:", + Some(&mut next_record_lsns), + ) + .await + .unwrap(); let end_pos = end_watch.get(); let streaming_wal_reader = StreamingWalReader::new( @@ -1027,10 +1036,16 @@ mod tests { .unwrap(); let resident_tli = tli.wal_residence_guard().await.unwrap(); - let end_watch = - Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns)) - .await - .unwrap(); + let end_watch = Env::write_wal( + tli, + start_lsn, + SIZE, + MSG_COUNT, + c"neon-file:", + Some(&mut next_record_lsns), + ) + .await + .unwrap(); assert!(next_record_lsns.len() > 3); let shard_0_start_lsn = next_record_lsns[3]; @@ -1124,4 +1139,88 @@ mod tests { } } } + + #[tokio::test] + async fn test_shard_zero_does_not_skip_empty_records() { + let _ = env_logger::builder().is_test(true).try_init(); + + const SIZE: usize = 8 * 1024; + const MSG_COUNT: usize = 10; + const PG_VERSION: u32 = 17; + + let start_lsn = Lsn::from_str("0/149FD18").unwrap(); + let env = Env::new(true).unwrap(); + let tli = env + .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn) + .await + .unwrap(); + + let resident_tli = tli.wal_residence_guard().await.unwrap(); + let mut next_record_lsns = Vec::new(); + let end_watch = Env::write_wal( + tli, + start_lsn, + SIZE, + MSG_COUNT, + // This is a logical message prefix that is not persisted to key value storage. + // We use it in order to validate that shard zero receives emtpy interpreted records. + c"test:", + Some(&mut next_record_lsns), + ) + .await + .unwrap(); + let end_pos = end_watch.get(); + + let streaming_wal_reader = StreamingWalReader::new( + resident_tli, + None, + start_lsn, + end_pos, + end_watch, + MAX_SEND_SIZE, + ); + + let shard = ShardIdentity::unsharded(); + let (records_tx, mut records_rx) = tokio::sync::mpsc::channel::(MSG_COUNT * 2); + + let handle = InterpretedWalReader::spawn( + streaming_wal_reader, + start_lsn, + records_tx, + shard, + PG_VERSION, + &Some("pageserver".to_string()), + ); + + let mut interpreted_records = Vec::new(); + while let Some(batch) = records_rx.recv().await { + interpreted_records.push(batch.records); + if batch.wal_end_lsn == batch.available_wal_end_lsn { + break; + } + } + + let received_next_record_lsns = interpreted_records + .into_iter() + .flat_map(|b| b.records) + .map(|rec| rec.next_record_lsn) + .collect::>(); + + // By default this also includes the start LSN. Trim it since it shouldn't be received. + let next_record_lsns = next_record_lsns.into_iter().skip(1).collect::>(); + + assert_eq!(received_next_record_lsns, next_record_lsns); + + handle.abort(); + let mut done = false; + for _ in 0..5 { + if handle.current_position().is_none() { + done = true; + break; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + + assert!(done); + } } diff --git a/safekeeper/src/test_utils.rs b/safekeeper/src/test_utils.rs index e6f74185c1..618e2b59d2 100644 --- a/safekeeper/src/test_utils.rs +++ b/safekeeper/src/test_utils.rs @@ -1,3 +1,4 @@ +use std::ffi::CStr; use std::sync::Arc; use camino_tempfile::Utf8TempDir; @@ -124,6 +125,7 @@ impl Env { start_lsn: Lsn, msg_size: usize, msg_count: usize, + prefix: &CStr, mut next_record_lsns: Option<&mut Vec>, ) -> anyhow::Result { let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE); @@ -133,7 +135,6 @@ impl Env { WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0)); - let prefix = c"neon-file:"; let prefixlen = prefix.to_bytes_with_nul().len(); assert!(msg_size >= prefixlen); let message = vec![0; msg_size - prefixlen]; diff --git a/safekeeper/src/wal_reader_stream.rs b/safekeeper/src/wal_reader_stream.rs index cc9d4e6e3b..aab82fedb5 100644 --- a/safekeeper/src/wal_reader_stream.rs +++ b/safekeeper/src/wal_reader_stream.rs @@ -246,7 +246,7 @@ mod tests { .unwrap(); let resident_tli = tli.wal_residence_guard().await.unwrap(); - let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None) + let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None) .await .unwrap(); let end_pos = end_watch.get();