diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index c3f1bcc8da..40626196d5 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -16,6 +16,7 @@ use crate::reltag::{RelTag, SlruKind}; use crate::repository::Repository; use crate::repository::Timeline; use crate::walingest::WalIngest; +use crate::walrecord::DecodedWALRecord; use postgres_ffi::relfile_utils::*; use postgres_ffi::waldecoder::*; use postgres_ffi::xlog_utils::*; @@ -268,10 +269,12 @@ fn import_wal( waldecoder.feed_bytes(&buf); let mut nrecords = 0; + let mut modification = tline.begin_modification(last_lsn); + let mut decoded = DecodedWALRecord::default(); while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let mut modification = tline.begin_modification(lsn); - walingest.ingest_record(recdata, lsn, &mut modification)?; + modification.lsn = lsn; + walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?; last_lsn = lsn; nrecords += 1; @@ -384,10 +387,12 @@ pub fn import_wal_from_tar( waldecoder.feed_bytes(&bytes[offset..]); + let mut modification = tline.begin_modification(last_lsn); + let mut decoded = DecodedWALRecord::default(); while last_lsn <= end_lsn { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let mut modification = tline.begin_modification(lsn); - walingest.ingest_record(recdata, lsn, &mut modification)?; + modification.lsn = lsn; + walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?; last_lsn = lsn; debug!("imported record at {} (end {})", lsn, end_lsn); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 8d2ed0eff9..edea9915a8 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -546,12 +546,6 @@ pub struct DatadirModification<'a, R: Repository> { } impl<'a, R: Repository> DatadirModification<'a, R> { - pub fn clear(&mut self) { - self.pending_updates.clear(); - self.pending_deletions.clear(); - self.pending_nblocks = 0; - } - /// Initialize a completely new repository. /// /// This inserts the directory metadata entries that are assumed to @@ -914,14 +908,15 @@ impl<'a, R: Repository> DatadirModification<'a, R> { /// Finish this atomic update, writing all the updated keys to the /// underlying timeline. /// - pub fn commit(&self) -> Result<()> { + pub fn commit(&mut self) -> Result<()> { let pending_nblocks = self.pending_nblocks; + self.pending_nblocks = 0; - for (key, value) in &self.pending_updates { - self.writer.put(key.clone(), self.lsn, value.clone())?; + for (key, value) in self.pending_updates.drain() { + self.writer.put(key, self.lsn, value)?; } - for key_range in &self.pending_deletions { - self.writer.delete(key_range.clone(), self.lsn)?; + for key_range in self.pending_deletions.drain(..) { + self.writer.delete(key_range, self.lsn)?; } self.writer.finish_write(self.lsn); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 6f405006db..35fb0f30eb 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -81,8 +81,10 @@ impl<'a, R: Repository> WalIngest<'a, R> { recdata: Bytes, lsn: Lsn, modification: &mut DatadirModification, + decoded: &mut DecodedWALRecord, ) -> Result<()> { - let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?; + decode_wal_record(recdata, decoded).context("failed decoding wal record")?; + let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); @@ -96,7 +98,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { if decoded.xl_rmid == pg_constants::RM_HEAP_ID || decoded.xl_rmid == pg_constants::RM_HEAP2_ID { - self.ingest_heapam_record(&mut buf, modification, &mut decoded)?; + self.ingest_heapam_record(&mut buf, modification, decoded)?; } // Handle other special record types if decoded.xl_rmid == pg_constants::RM_SMGR_ID @@ -211,7 +213,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { } } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID { let xlrec = XlRelmapUpdate::decode(&mut buf); - self.ingest_relmap_page(modification, &xlrec, &decoded)?; + self.ingest_relmap_page(modification, &xlrec, decoded)?; } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_NEXTOID { @@ -246,7 +248,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { - self.ingest_decoded_block(modification, lsn, &decoded, blk)?; + self.ingest_decoded_block(modification, lsn, decoded, blk)?; } // If checkpoint data was updated, store the new version in the repository diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 0aa1a20785..9b3247981f 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -23,6 +23,7 @@ use crate::{ repository::{Repository, Timeline}, tenant_mgr, walingest::WalIngest, + walrecord::DecodedWALRecord, }; use postgres_ffi::waldecoder::WalStreamDecoder; use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId}; @@ -153,19 +154,19 @@ pub async fn handle_walreceiver_connection( // let mut n_records = 0; // timer = std::time::Instant::now(); { + let mut decoded = DecodedWALRecord::default(); let mut modification = timeline.begin_modification(last_rec_lsn); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let _enter = info_span!("processing record", lsn = %lsn).entered(); + // let _enter = info_span!("processing record", lsn = %lsn).entered(); // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are // at risk of hitting a deadlock. ensure!(lsn.is_aligned()); - modification.clear(); modification.lsn = lsn; - walingest.ingest_record(recdata, lsn, &mut modification)?; + walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?; fail_point!("walreceiver-after-ingest"); diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 5a384360e2..105910e481 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -96,6 +96,7 @@ impl DecodedBkpBlock { } } +#[derive(Default)] pub struct DecodedWALRecord { pub xl_xid: TransactionId, pub xl_info: u8, @@ -505,7 +506,10 @@ impl XlMultiXactTruncate { // block data // ... // main data -pub fn decode_wal_record(record: Bytes) -> Result { +pub fn decode_wal_record( + record: Bytes, + decoded: &mut DecodedWALRecord, +) -> Result<(), DeserializeError> { let mut rnode_spcnode: u32 = 0; let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; @@ -534,7 +538,9 @@ pub fn decode_wal_record(record: Bytes) -> Result = Vec::new(); + if !decoded.blocks.is_empty() { + decoded.blocks.clear(); + } // 2. Decode the headers. // XLogRecordBlockHeaders if any, @@ -713,7 +719,7 @@ pub fn decode_wal_record(record: Bytes) -> Result { @@ -724,7 +730,7 @@ pub fn decode_wal_record(record: Bytes) -> Result Result