diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 7b93146b4c..93d1745a08 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -129,9 +129,6 @@ pub async fn handle_walreceiver_connection( let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?; - let mut decoded = DecodedWALRecord::default(); - let mut modification = timeline.begin_modification(); - while let Some(replication_message) = { select! { _ = cancellation.changed() => { @@ -154,19 +151,23 @@ pub async fn handle_walreceiver_connection( waldecoder.feed_bytes(data); - while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - // let _enter = info_span!("processing record", lsn = %lsn).entered(); + { + let mut decoded = DecodedWALRecord::default(); + let mut modification = timeline.begin_modification(); + while let Some((lsn, recdata)) = waldecoder.poll_decode()? { + // let _enter = info_span!("processing record", lsn = %lsn).entered(); - // It is important to deal with the aligned records as lsn in getPage@LSN is - // aligned and can be several bytes bigger. Without this alignment we are - // at risk of hitting a deadlock. - ensure!(lsn.is_aligned()); + // It is important to deal with the aligned records as lsn in getPage@LSN is + // aligned and can be several bytes bigger. Without this alignment we are + // at risk of hitting a deadlock. + ensure!(lsn.is_aligned()); - walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?; + walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?; - fail_point!("walreceiver-after-ingest"); + fail_point!("walreceiver-after-ingest"); - last_rec_lsn = lsn; + last_rec_lsn = lsn; + } } if !caught_up && endlsn >= end_of_wal {