move decoded and modification to be inside the loop

This commit is contained in:
Thang Pham
2022-06-29 17:05:33 -04:00
parent 0e2462e5f2
commit 2eeb80b071

View File

@@ -129,9 +129,6 @@ pub async fn handle_walreceiver_connection(
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?;
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification();
while let Some(replication_message) = {
select! {
_ = cancellation.changed() => {
@@ -154,19 +151,23 @@ pub async fn handle_walreceiver_connection(
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// let _enter = info_span!("processing record", lsn = %lsn).entered();
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification();
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// let _enter = info_span!("processing record", lsn = %lsn).entered();
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
fail_point!("walreceiver-after-ingest");
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
last_rec_lsn = lsn;
}
}
if !caught_up && endlsn >= end_of_wal {