mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 13:30:38 +00:00
wal_decoder: rename end_lsn to next_record_lsn (#9776)
## Problem It turns out that `WalStreamDecoder::poll_decode` returns the start LSN of the next record and not the end LSN of the current record. They are not always equal. For example, they're not equal when the record in question is an XLOG SWITCH record. ## Summary of changes Rename things to reflect that.
This commit is contained in:
@@ -331,11 +331,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
while let Some((record_end_lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
// at risk of hitting a deadlock.
|
||||
if !record_end_lsn.is_aligned() {
|
||||
if !next_record_lsn.is_aligned() {
|
||||
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
|
||||
}
|
||||
|
||||
@@ -343,7 +343,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
modification.tline.get_shard_identity(),
|
||||
record_end_lsn,
|
||||
next_record_lsn,
|
||||
modification.tline.pg_version,
|
||||
)?;
|
||||
|
||||
@@ -367,10 +367,10 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("could not ingest record at {record_end_lsn}")
|
||||
format!("could not ingest record at {next_record_lsn}")
|
||||
})?;
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {record_end_lsn}");
|
||||
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
@@ -380,7 +380,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// to timeout the tests.
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
last_rec_lsn = record_end_lsn;
|
||||
last_rec_lsn = next_record_lsn;
|
||||
|
||||
// Commit every ingest_batch_size records. Even if we filtered out
|
||||
// all records, we still need to call commit to advance the LSN.
|
||||
|
||||
@@ -154,7 +154,7 @@ impl WalIngest {
|
||||
WAL_INGEST.records_received.inc();
|
||||
let prev_len = modification.len();
|
||||
|
||||
modification.set_lsn(interpreted.end_lsn)?;
|
||||
modification.set_lsn(interpreted.next_record_lsn)?;
|
||||
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
|
||||
// Records of this type should always be preceded by a commit(), as they
|
||||
|
||||
Reference in New Issue
Block a user