diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index f8a41e5b2b..6402657e05 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -16,6 +16,7 @@ use crate::reltag::{RelTag, SlruKind}; use crate::repository::Repository; use crate::repository::Timeline; use crate::walingest::WalIngest; +use crate::walrecord::DecodedWALRecord; use postgres_ffi::relfile_utils::*; use postgres_ffi::waldecoder::*; use postgres_ffi::xlog_utils::*; @@ -38,7 +39,7 @@ pub fn import_timeline_from_postgres_datadir( // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn) // Then fishing out pg_control would be unnecessary - let mut modification = tline.begin_modification(lsn); + let mut modification = tline.begin_modification(); modification.init_empty()?; // Import all but pg_wal @@ -57,12 +58,12 @@ pub fn import_timeline_from_postgres_datadir( if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? { pg_control = Some(control_file); } - modification.flush()?; + modification.flush(lsn)?; } } // We're done importing all the data files. - modification.commit()?; + modification.commit(lsn)?; // We expect the Postgres server to be shut down cleanly. let pg_control = pg_control.context("pg_control file not found")?; @@ -268,9 +269,11 @@ fn import_wal( waldecoder.feed_bytes(&buf); let mut nrecords = 0; + let mut modification = tline.begin_modification(); + let mut decoded = DecodedWALRecord::default(); while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { - walingest.ingest_record(tline, recdata, lsn)?; + walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?; last_lsn = lsn; nrecords += 1; @@ -300,7 +303,7 @@ pub fn import_basebackup_from_tar( base_lsn: Lsn, ) -> Result<()> { info!("importing base at {}", base_lsn); - let mut modification = tline.begin_modification(base_lsn); + let mut modification = tline.begin_modification(); modification.init_empty()?; let mut pg_control: Option = None; @@ -318,7 +321,7 @@ pub fn import_basebackup_from_tar( // We found the pg_control file. pg_control = Some(res); } - modification.flush()?; + modification.flush(base_lsn)?; } tar::EntryType::Directory => { debug!("directory {:?}", file_path); @@ -332,7 +335,7 @@ pub fn import_basebackup_from_tar( // sanity check: ensure that pg_control is loaded let _pg_control = pg_control.context("pg_control file not found")?; - modification.commit()?; + modification.commit(base_lsn)?; Ok(()) } @@ -384,9 +387,11 @@ pub fn import_wal_from_tar( waldecoder.feed_bytes(&bytes[offset..]); + let mut modification = tline.begin_modification(); + let mut decoded = DecodedWALRecord::default(); while last_lsn <= end_lsn { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { - walingest.ingest_record(tline, recdata, lsn)?; + walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?; last_lsn = lsn; debug!("imported record at {} (end {})", lsn, end_lsn); diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index d622df531a..ce5cb57745 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -672,11 +672,21 @@ impl DeltaLayerWriter { /// The values must be appended in key, lsn order. /// pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> { + self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init()) + } + + pub fn put_value_bytes( + &mut self, + key: Key, + lsn: Lsn, + val: &[u8], + will_init: bool, + ) -> Result<()> { assert!(self.lsn_range.start <= lsn); - let off = self.blob_writer.write_blob(&Value::ser(&val)?)?; + let off = self.blob_writer.write_blob(val)?; - let blob_ref = BlobRef::new(off, val.will_init()); + let blob_ref = BlobRef::new(off, will_init); let delta_key = DeltaKey::from_key_lsn(&key, lsn); self.tree.append(&delta_key.0, blob_ref.0)?; diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 1f89f333dd..5f269a868f 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -15,6 +15,7 @@ use crate::layered_repository::storage_layer::{ use crate::repository::{Key, Value}; use crate::walrecord; use anyhow::{bail, ensure, Result}; +use std::cell::RefCell; use std::collections::HashMap; use tracing::*; use utils::{ @@ -30,6 +31,12 @@ use std::ops::Range; use std::path::PathBuf; use std::sync::RwLock; +thread_local! { + /// A buffer for serializing object during [`InMemoryLayer::put_value`]. + /// This buffer is reused for each serialization to avoid additional malloc calls. + static SER_BUFFER: RefCell> = RefCell::new(Vec::new()); +} + pub struct InMemoryLayer { conf: &'static PageServerConf, tenantid: ZTenantId, @@ -278,10 +285,17 @@ impl InMemoryLayer { pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> { trace!("put_value key {} at {}/{}", key, self.timelineid, lsn); let mut inner = self.inner.write().unwrap(); - inner.assert_writeable(); - let off = inner.file.write_blob(&Value::ser(val)?)?; + let off = { + SER_BUFFER.with(|x| -> Result<_> { + let mut buf = x.borrow_mut(); + buf.clear(); + val.ser_into(&mut (*buf))?; + let off = inner.file.write_blob(&buf)?; + Ok(off) + })? + }; let vec_map = inner.index.entry(key).or_default(); let old = vec_map.append_or_update_last(lsn, off).unwrap().0; @@ -350,8 +364,8 @@ impl InMemoryLayer { // Write all page versions for (lsn, pos) in vec_map.as_slice() { cursor.read_blob_into_buf(*pos, &mut buf)?; - let val = Value::des(&buf)?; - delta_layer_writer.put_value(key, *lsn, val)?; + let will_init = Value::des(&buf)?.will_init(); + delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?; } } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index f696c1f411..788c9de29e 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -80,23 +80,25 @@ impl DatadirTimeline { /// the timeline. /// /// This provides a transaction-like interface to perform a bunch - /// of modifications atomically, all stamped with one LSN. + /// of modifications atomically. /// - /// To ingest a WAL record, call begin_modification(lsn) to get a + /// To ingest a WAL record, call begin_modification() to get a /// DatadirModification object. Use the functions in the object to /// modify the repository state, updating all the pages and metadata - /// that the WAL record affects. When you're done, call commit() to - /// commit the changes. + /// that the WAL record affects. When you're done, call commit(lsn) to + /// commit the changes. All the changes will be stamped with the specified LSN. + /// + /// Calling commit(lsn) will flush all the changes and reset the state, + /// so the `DatadirModification` struct can be reused to perform the next modification. /// /// Note that any pending modifications you make through the /// modification object won't be visible to calls to the 'get' and list /// functions of the timeline until you finish! And if you update the /// same page twice, the last update wins. /// - pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification { + pub fn begin_modification(&self) -> DatadirModification { DatadirModification { tline: self, - lsn, pending_updates: HashMap::new(), pending_deletions: Vec::new(), pending_nblocks: 0, @@ -533,8 +535,6 @@ pub struct DatadirModification<'a, R: Repository> { /// in the state in 'tline' yet. pub tline: &'a DatadirTimeline, - lsn: Lsn, - // The modifications are not applied directly to the underlying key-value store. // The put-functions add the modifications here, and they are flushed to the // underlying key-value store by the 'finish' function. @@ -920,7 +920,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> { /// retains all the metadata, but data pages are flushed. That's again OK /// for bulk import, where you are just loading data pages and won't try to /// modify the same pages twice. - pub fn flush(&mut self) -> Result<()> { + pub fn flush(&mut self, lsn: Lsn) -> Result<()> { // Unless we have accumulated a decent amount of changes, it's not worth it // to scan through the pending_updates list. let pending_nblocks = self.pending_nblocks; @@ -934,7 +934,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> { let mut result: Result<()> = Ok(()); self.pending_updates.retain(|&key, value| { if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) { - result = writer.put(key, self.lsn, value); + result = writer.put(key, lsn, value); false } else { true @@ -956,20 +956,22 @@ impl<'a, R: Repository> DatadirModification<'a, R> { /// /// Finish this atomic update, writing all the updated keys to the /// underlying timeline. + /// All the modifications in this atomic update are stamped by the specified LSN. /// - pub fn commit(self) -> Result<()> { + pub fn commit(&mut self, lsn: Lsn) -> Result<()> { let writer = self.tline.tline.writer(); let pending_nblocks = self.pending_nblocks; + self.pending_nblocks = 0; - for (key, value) in self.pending_updates { - writer.put(key, self.lsn, &value)?; + for (key, value) in self.pending_updates.drain() { + writer.put(key, lsn, &value)?; } - for key_range in self.pending_deletions { - writer.delete(key_range.clone(), self.lsn)?; + for key_range in self.pending_deletions.drain(..) { + writer.delete(key_range, lsn)?; } - writer.finish_write(self.lsn); + writer.finish_write(lsn); if pending_nblocks != 0 { self.tline.current_logical_size.fetch_add( @@ -1407,9 +1409,9 @@ pub fn create_test_timeline( ) -> Result>> { let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?; let tline = DatadirTimeline::new(tline, 256 * 1024); - let mut m = tline.begin_modification(Lsn(8)); + let mut m = tline.begin_modification(); m.init_empty()?; - m.commit()?; + m.commit(Lsn(8))?; Ok(Arc::new(tline)) } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 2f39007e9f..adc24328ae 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -78,13 +78,13 @@ impl<'a, R: Repository> WalIngest<'a, R> { /// pub fn ingest_record( &mut self, - timeline: &DatadirTimeline, recdata: Bytes, lsn: Lsn, + modification: &mut DatadirModification, + decoded: &mut DecodedWALRecord, ) -> Result<()> { - let mut modification = timeline.begin_modification(lsn); + decode_wal_record(recdata, decoded).context("failed decoding wal record")?; - let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?; let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); @@ -98,7 +98,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { if decoded.xl_rmid == pg_constants::RM_HEAP_ID || decoded.xl_rmid == pg_constants::RM_HEAP2_ID { - self.ingest_heapam_record(&mut buf, &mut modification, &mut decoded)?; + self.ingest_heapam_record(&mut buf, modification, decoded)?; } // Handle other special record types if decoded.xl_rmid == pg_constants::RM_SMGR_ID @@ -106,19 +106,19 @@ impl<'a, R: Repository> WalIngest<'a, R> { == pg_constants::XLOG_SMGR_CREATE { let create = XlSmgrCreate::decode(&mut buf); - self.ingest_xlog_smgr_create(&mut modification, &create)?; + self.ingest_xlog_smgr_create(modification, &create)?; } else if decoded.xl_rmid == pg_constants::RM_SMGR_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE { let truncate = XlSmgrTruncate::decode(&mut buf); - self.ingest_xlog_smgr_truncate(&mut modification, &truncate)?; + self.ingest_xlog_smgr_truncate(modification, &truncate)?; } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE { let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(&mut modification, &createdb)?; + self.ingest_xlog_dbase_create(modification, &createdb)?; } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_DROP { @@ -137,7 +137,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; self.put_slru_page_image( - &mut modification, + modification, SlruKind::Clog, segno, rpageno, @@ -146,7 +146,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { } else { assert!(info == pg_constants::CLOG_TRUNCATE); let xlrec = XlClogTruncate::decode(&mut buf); - self.ingest_clog_truncate_record(&mut modification, &xlrec)?; + self.ingest_clog_truncate_record(modification, &xlrec)?; } } else if decoded.xl_rmid == pg_constants::RM_XACT_ID { let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; @@ -154,7 +154,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); self.ingest_xact_record( - &mut modification, + modification, &parsed_xact, info == pg_constants::XLOG_XACT_COMMIT, )?; @@ -164,7 +164,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); self.ingest_xact_record( - &mut modification, + modification, &parsed_xact, info == pg_constants::XLOG_XACT_COMMIT_PREPARED, )?; @@ -187,7 +187,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; self.put_slru_page_image( - &mut modification, + modification, SlruKind::MultiXactOffsets, segno, rpageno, @@ -198,7 +198,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; self.put_slru_page_image( - &mut modification, + modification, SlruKind::MultiXactMembers, segno, rpageno, @@ -206,14 +206,14 @@ impl<'a, R: Repository> WalIngest<'a, R> { )?; } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); - self.ingest_multixact_create_record(&mut modification, &xlrec)?; + self.ingest_multixact_create_record(modification, &xlrec)?; } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { let xlrec = XlMultiXactTruncate::decode(&mut buf); - self.ingest_multixact_truncate_record(&mut modification, &xlrec)?; + self.ingest_multixact_truncate_record(modification, &xlrec)?; } } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID { let xlrec = XlRelmapUpdate::decode(&mut buf); - self.ingest_relmap_page(&mut modification, &xlrec, &decoded)?; + self.ingest_relmap_page(modification, &xlrec, decoded)?; } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_NEXTOID { @@ -248,7 +248,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { - self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?; + self.ingest_decoded_block(modification, lsn, decoded, blk)?; } // If checkpoint data was updated, store the new version in the repository @@ -261,7 +261,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { // Now that this record has been fully handled, including updating the // checkpoint data, let the repository know that it is up-to-date to this LSN - modification.commit()?; + modification.commit(lsn)?; Ok(()) } @@ -1069,10 +1069,10 @@ mod tests { static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]); fn init_walingest_test(tline: &DatadirTimeline) -> Result> { - let mut m = tline.begin_modification(Lsn(0x10)); + let mut m = tline.begin_modification(); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file - m.commit()?; + m.commit(Lsn(0x10))?; let walingest = WalIngest::new(tline, Lsn(0x10))?; Ok(walingest) @@ -1084,19 +1084,19 @@ mod tests { let tline = create_test_timeline(repo, TIMELINE_ID)?; let mut walingest = init_walingest_test(&tline)?; - let mut m = tline.begin_modification(Lsn(0x20)); + let mut m = tline.begin_modification(); walingest.put_rel_creation(&mut m, TESTREL_A)?; walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?; - m.commit()?; - let mut m = tline.begin_modification(Lsn(0x30)); + m.commit(Lsn(0x20))?; + let mut m = tline.begin_modification(); walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"))?; - m.commit()?; - let mut m = tline.begin_modification(Lsn(0x40)); + m.commit(Lsn(0x30))?; + let mut m = tline.begin_modification(); walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"))?; - m.commit()?; - let mut m = tline.begin_modification(Lsn(0x50)); + m.commit(Lsn(0x40))?; + let mut m = tline.begin_modification(); walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?; - m.commit()?; + m.commit(Lsn(0x50))?; assert_current_logical_size(&tline, Lsn(0x50)); @@ -1142,9 +1142,9 @@ mod tests { ); // Truncate last block - let mut m = tline.begin_modification(Lsn(0x60)); + let mut m = tline.begin_modification(); walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?; - m.commit()?; + m.commit(Lsn(0x60))?; assert_current_logical_size(&tline, Lsn(0x60)); // Check reported size and contents after truncation @@ -1166,15 +1166,15 @@ mod tests { ); // Truncate to zero length - let mut m = tline.begin_modification(Lsn(0x68)); + let mut m = tline.begin_modification(); walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?; - m.commit()?; + m.commit(Lsn(0x68))?; assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68))?, 0); // Extend from 0 to 2 blocks, leaving a gap - let mut m = tline.begin_modification(Lsn(0x70)); + let mut m = tline.begin_modification(); walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))?; - m.commit()?; + m.commit(Lsn(0x70))?; assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70))?, 2); assert_eq!( tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?, @@ -1186,9 +1186,9 @@ mod tests { ); // Extend a lot more, leaving a big gap that spans across segments - let mut m = tline.begin_modification(Lsn(0x80)); + let mut m = tline.begin_modification(); walingest.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))?; - m.commit()?; + m.commit(Lsn(0x80))?; assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, 1501); for blk in 2..1500 { assert_eq!( @@ -1212,18 +1212,18 @@ mod tests { let tline = create_test_timeline(repo, TIMELINE_ID)?; let mut walingest = init_walingest_test(&tline)?; - let mut m = tline.begin_modification(Lsn(0x20)); + let mut m = tline.begin_modification(); walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?; - m.commit()?; + m.commit(Lsn(0x20))?; // Check that rel exists and size is correct assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true); assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1); // Drop rel - let mut m = tline.begin_modification(Lsn(0x30)); + let mut m = tline.begin_modification(); walingest.put_rel_drop(&mut m, TESTREL_A)?; - m.commit()?; + m.commit(Lsn(0x30))?; // Check that rel is not visible anymore assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false); @@ -1232,9 +1232,9 @@ mod tests { //assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30))?.is_none()); // Re-create it - let mut m = tline.begin_modification(Lsn(0x40)); + let mut m = tline.begin_modification(); walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"))?; - m.commit()?; + m.commit(Lsn(0x40))?; // Check that rel exists and size is correct assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true); @@ -1254,12 +1254,12 @@ mod tests { // Create a 20 MB relation (the size is arbitrary) let relsize = 20 * 1024 * 1024 / 8192; - let mut m = tline.begin_modification(Lsn(0x20)); + let mut m = tline.begin_modification(); for blkno in 0..relsize { let data = format!("foo blk {} at {}", blkno, Lsn(0x20)); walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?; } - m.commit()?; + m.commit(Lsn(0x20))?; // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false); @@ -1280,9 +1280,9 @@ mod tests { // Truncate relation so that second segment was dropped // - only leave one page - let mut m = tline.begin_modification(Lsn(0x60)); + let mut m = tline.begin_modification(); walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?; - m.commit()?; + m.commit(Lsn(0x60))?; // Check reported size and contents after truncation assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 1); @@ -1310,12 +1310,12 @@ mod tests { // Extend relation again. // Add enough blocks to create second segment let lsn = Lsn(0x80); - let mut m = tline.begin_modification(lsn); + let mut m = tline.begin_modification(); for blkno in 0..relsize { let data = format!("foo blk {} at {}", blkno, lsn); walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?; } - m.commit()?; + m.commit(lsn)?; assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true); assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, relsize); @@ -1343,10 +1343,10 @@ mod tests { let mut lsn = 0x10; for blknum in 0..pg_constants::RELSEG_SIZE + 1 { lsn += 0x10; - let mut m = tline.begin_modification(Lsn(lsn)); + let mut m = tline.begin_modification(); let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn))); walingest.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img)?; - m.commit()?; + m.commit(Lsn(lsn))?; } assert_current_logical_size(&tline, Lsn(lsn)); @@ -1358,9 +1358,9 @@ mod tests { // Truncate one block lsn += 0x10; - let mut m = tline.begin_modification(Lsn(lsn)); + let mut m = tline.begin_modification(); walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE)?; - m.commit()?; + m.commit(Lsn(lsn))?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn))?, pg_constants::RELSEG_SIZE @@ -1369,9 +1369,9 @@ mod tests { // Truncate another block lsn += 0x10; - let mut m = tline.begin_modification(Lsn(lsn)); + let mut m = tline.begin_modification(); walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE - 1)?; - m.commit()?; + m.commit(Lsn(lsn))?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn))?, pg_constants::RELSEG_SIZE - 1 @@ -1383,9 +1383,9 @@ mod tests { let mut size: i32 = 3000; while size >= 0 { lsn += 0x10; - let mut m = tline.begin_modification(Lsn(lsn)); + let mut m = tline.begin_modification(); walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?; - m.commit()?; + m.commit(Lsn(lsn))?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn))?, size as BlockNumber diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 0c8c0ae2f6..cc1a9cc5eb 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -23,6 +23,7 @@ use crate::{ repository::{Repository, Timeline}, tenant_mgr, walingest::WalIngest, + walrecord::DecodedWALRecord, }; use postgres_ffi::waldecoder::WalStreamDecoder; use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId}; @@ -150,19 +151,25 @@ pub async fn handle_walreceiver_connection( waldecoder.feed_bytes(data); - while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let _enter = info_span!("processing record", lsn = %lsn).entered(); + { + let mut decoded = DecodedWALRecord::default(); + let mut modification = timeline.begin_modification(); + while let Some((lsn, recdata)) = waldecoder.poll_decode()? { + // let _enter = info_span!("processing record", lsn = %lsn).entered(); - // It is important to deal with the aligned records as lsn in getPage@LSN is - // aligned and can be several bytes bigger. Without this alignment we are - // at risk of hitting a deadlock. - ensure!(lsn.is_aligned()); + // It is important to deal with the aligned records as lsn in getPage@LSN is + // aligned and can be several bytes bigger. Without this alignment we are + // at risk of hitting a deadlock. + ensure!(lsn.is_aligned()); - walingest.ingest_record(&timeline, recdata, lsn)?; + walingest + .ingest_record(recdata, lsn, &mut modification, &mut decoded) + .context("could not ingest record at {lsn}")?; - fail_point!("walreceiver-after-ingest"); + fail_point!("walreceiver-after-ingest"); - last_rec_lsn = lsn; + last_rec_lsn = lsn; + } } if !caught_up && endlsn >= end_of_wal { diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 5a384360e2..6b01d52005 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -96,6 +96,7 @@ impl DecodedBkpBlock { } } +#[derive(Default)] pub struct DecodedWALRecord { pub xl_xid: TransactionId, pub xl_info: u8, @@ -505,7 +506,17 @@ impl XlMultiXactTruncate { // block data // ... // main data -pub fn decode_wal_record(record: Bytes) -> Result { +// +// +// For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in. +// It would be more natural for this function to return a DecodedWALRecord as return value, +// but reusing the caller-supplied struct avoids an allocation. +// This code is in the hot path for digesting incoming WAL, and is very performance sensitive. +// +pub fn decode_wal_record( + record: Bytes, + decoded: &mut DecodedWALRecord, +) -> Result<(), DeserializeError> { let mut rnode_spcnode: u32 = 0; let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; @@ -534,7 +545,7 @@ pub fn decode_wal_record(record: Bytes) -> Result = Vec::new(); + decoded.blocks.clear(); // 2. Decode the headers. // XLogRecordBlockHeaders if any, @@ -713,7 +724,7 @@ pub fn decode_wal_record(record: Bytes) -> Result { @@ -724,7 +735,7 @@ pub fn decode_wal_record(record: Bytes) -> Result Result