mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
safekeeper: don't skip empty records for shard zero (#11137)
## Problem Shard zero needs to track the start LSN of the latest record in adition to the LSN of the next record to ingest. The former is included in basebackup persisted by the compute in WAL. Previously, empty records were skipped for all shards. This caused the prev LSN tracking on the PS to fall behind and led to logical replication issues. ## Summary of changes Shard zero now receives emtpy interpreted records for LSN tracking purposes. A test is included too.
This commit is contained in:
@@ -430,7 +430,10 @@ impl InterpretedWalReader {
|
||||
.with_context(|| "Failed to interpret WAL")?;
|
||||
|
||||
for (shard, record) in interpreted {
|
||||
if record.is_empty() {
|
||||
// Shard zero needs to track the start LSN of the latest record
|
||||
// in adition to the LSN of the next record to ingest. The former
|
||||
// is included in basebackup persisted by the compute in WAL.
|
||||
if !shard.is_shard_zero() && record.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -740,7 +743,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
@@ -883,10 +886,16 @@ mod tests {
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let mut next_record_lsns = Vec::default();
|
||||
let end_watch =
|
||||
Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
|
||||
.await
|
||||
.unwrap();
|
||||
let end_watch = Env::write_wal(
|
||||
tli,
|
||||
start_lsn,
|
||||
SIZE,
|
||||
MSG_COUNT,
|
||||
c"neon-file:",
|
||||
Some(&mut next_record_lsns),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
let streaming_wal_reader = StreamingWalReader::new(
|
||||
@@ -1027,10 +1036,16 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch =
|
||||
Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
|
||||
.await
|
||||
.unwrap();
|
||||
let end_watch = Env::write_wal(
|
||||
tli,
|
||||
start_lsn,
|
||||
SIZE,
|
||||
MSG_COUNT,
|
||||
c"neon-file:",
|
||||
Some(&mut next_record_lsns),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(next_record_lsns.len() > 3);
|
||||
let shard_0_start_lsn = next_record_lsns[3];
|
||||
@@ -1124,4 +1139,88 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_shard_zero_does_not_skip_empty_records() {
|
||||
let _ = env_logger::builder().is_test(true).try_init();
|
||||
|
||||
const SIZE: usize = 8 * 1024;
|
||||
const MSG_COUNT: usize = 10;
|
||||
const PG_VERSION: u32 = 17;
|
||||
|
||||
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
|
||||
let env = Env::new(true).unwrap();
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let mut next_record_lsns = Vec::new();
|
||||
let end_watch = Env::write_wal(
|
||||
tli,
|
||||
start_lsn,
|
||||
SIZE,
|
||||
MSG_COUNT,
|
||||
// This is a logical message prefix that is not persisted to key value storage.
|
||||
// We use it in order to validate that shard zero receives emtpy interpreted records.
|
||||
c"test:",
|
||||
Some(&mut next_record_lsns),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
let streaming_wal_reader = StreamingWalReader::new(
|
||||
resident_tli,
|
||||
None,
|
||||
start_lsn,
|
||||
end_pos,
|
||||
end_watch,
|
||||
MAX_SEND_SIZE,
|
||||
);
|
||||
|
||||
let shard = ShardIdentity::unsharded();
|
||||
let (records_tx, mut records_rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
|
||||
|
||||
let handle = InterpretedWalReader::spawn(
|
||||
streaming_wal_reader,
|
||||
start_lsn,
|
||||
records_tx,
|
||||
shard,
|
||||
PG_VERSION,
|
||||
&Some("pageserver".to_string()),
|
||||
);
|
||||
|
||||
let mut interpreted_records = Vec::new();
|
||||
while let Some(batch) = records_rx.recv().await {
|
||||
interpreted_records.push(batch.records);
|
||||
if batch.wal_end_lsn == batch.available_wal_end_lsn {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let received_next_record_lsns = interpreted_records
|
||||
.into_iter()
|
||||
.flat_map(|b| b.records)
|
||||
.map(|rec| rec.next_record_lsn)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// By default this also includes the start LSN. Trim it since it shouldn't be received.
|
||||
let next_record_lsns = next_record_lsns.into_iter().skip(1).collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(received_next_record_lsns, next_record_lsns);
|
||||
|
||||
handle.abort();
|
||||
let mut done = false;
|
||||
for _ in 0..5 {
|
||||
if handle.current_position().is_none() {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(1)).await;
|
||||
}
|
||||
|
||||
assert!(done);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::ffi::CStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
@@ -124,6 +125,7 @@ impl Env {
|
||||
start_lsn: Lsn,
|
||||
msg_size: usize,
|
||||
msg_count: usize,
|
||||
prefix: &CStr,
|
||||
mut next_record_lsns: Option<&mut Vec<Lsn>>,
|
||||
) -> anyhow::Result<EndWatch> {
|
||||
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
|
||||
@@ -133,7 +135,6 @@ impl Env {
|
||||
|
||||
WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
|
||||
|
||||
let prefix = c"neon-file:";
|
||||
let prefixlen = prefix.to_bytes_with_nul().len();
|
||||
assert!(msg_size >= prefixlen);
|
||||
let message = vec![0; msg_size - prefixlen];
|
||||
|
||||
@@ -246,7 +246,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
Reference in New Issue
Block a user